Skip to content

Commit

Permalink
HADOOP-18908. Improve S3A region handling. (#6187)
Browse files Browse the repository at this point in the history
S3A region logic improved for better inference and
to be compatible with previous releases

1. If you are using an AWS S3 AccessPoint, its region is determined
   from the ARN itself.
2. If fs.s3a.endpoint.region is set and non-empty, it is used.
3. If fs.s3a.endpoint is an s3.*.amazonaws.com url, 
   the region is determined by by parsing the URL 
   Note: vpce endpoints are not handled by this.
4. If fs.s3a.endpoint.region==null, and none could be determined
   from the endpoint, use us-east-2 as default.
5. If fs.s3a.endpoint.region=="" then it is handed off to
   The default AWS SDK resolution process.

Consult the AWS SDK documentation for the details on its resolution
process, knowing that it is complicated and may use environment variables,
entries in ~/.aws/config, IAM instance information within
EC2 deployments and possibly even JSON resources on the classpath.
Put differently: it is somewhat brittle across deployments.

Contributed by Ahmar Suhail
  • Loading branch information
steveloughran authored Oct 17, 2023
1 parent e5eb404 commit e0563fe
Show file tree
Hide file tree
Showing 11 changed files with 269 additions and 179 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -407,10 +407,6 @@ public final class StoreStatisticNames {
public static final String MULTIPART_UPLOAD_LIST
= "multipart_upload_list";

/** Probe for store region: {@value}. */
public static final String STORE_REGION_PROBE
= "store_region_probe";

private StoreStatisticNames() {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1179,6 +1179,12 @@ private Constants() {
*/
public static final String AWS_S3_CENTRAL_REGION = "us-east-1";

/**
* The default S3 region when using cross region client.
* Value {@value}.
*/
public static final String AWS_S3_DEFAULT_REGION = "us-east-2";

/**
* Require that all S3 access is made through Access Points.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import software.amazon.awssdk.awscore.util.AwsHostNameUtils;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;
Expand All @@ -48,6 +49,9 @@
import org.apache.hadoop.fs.s3a.statistics.impl.AwsStatisticsCollector;
import org.apache.hadoop.fs.store.LogExactlyOnce;

import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION;
import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_DEFAULT_REGION;
import static org.apache.hadoop.fs.s3a.Constants.CENTRAL_ENDPOINT;
import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.REQUESTER_PAYS_HEADER;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_SECURE_CONNECTIONS;
import static org.apache.hadoop.fs.s3a.Constants.SECURE_CONNECTIONS;
Expand All @@ -66,12 +70,27 @@ public class DefaultS3ClientFactory extends Configured

private static final String REQUESTER_PAYS_HEADER_VALUE = "requester";

private static final String S3_SERVICE_NAME = "s3";

/**
* Subclasses refer to this.
*/
protected static final Logger LOG =
LoggerFactory.getLogger(DefaultS3ClientFactory.class);

/**
* A one-off warning of default region chains in use.
*/
private static final LogExactlyOnce WARN_OF_DEFAULT_REGION_CHAIN =
new LogExactlyOnce(LOG);

/**
* Warning message printed when the SDK Region chain is in use.
*/
private static final String SDK_REGION_CHAIN_IN_USE =
"S3A filesystem client is using"
+ " the SDK region resolution chain.";


/** Exactly once log to inform about ignoring the AWS-SDK Warnings for CSE. */
private static final LogExactlyOnce IGNORE_CSE_WARN = new LogExactlyOnce(LOG);
Expand Down Expand Up @@ -138,15 +157,7 @@ private <BuilderT extends S3BaseClientBuilder<BuilderT, ClientT>, ClientT> Build
BuilderT builder, S3ClientCreationParameters parameters, Configuration conf, String bucket)
throws IOException {

Region region = parameters.getRegion();
LOG.debug("Using region {}", region);

URI endpoint = getS3Endpoint(parameters.getEndpoint(), conf);

if (endpoint != null) {
builder.endpointOverride(endpoint);
LOG.debug("Using endpoint {}", endpoint);
}
configureEndpointAndRegion(builder, parameters, conf);

S3Configuration serviceConfiguration = S3Configuration.builder()
.pathStyleAccessEnabled(parameters.isPathStyleAccess())
Expand All @@ -155,7 +166,6 @@ private <BuilderT extends S3BaseClientBuilder<BuilderT, ClientT>, ClientT> Build
return builder
.overrideConfiguration(createClientOverrideConfiguration(parameters, conf))
.credentialsProvider(parameters.getCredentialSet())
.region(region)
.serviceConfiguration(serviceConfiguration);
}

Expand Down Expand Up @@ -201,6 +211,72 @@ protected ClientOverrideConfiguration createClientOverrideConfiguration(
return clientOverrideConfigBuilder.build();
}

/**
* This method configures the endpoint and region for a S3 client.
* The order of configuration is:
*
* <ol>
* <li>If region is configured via fs.s3a.endpoint.region, use it.</li>
* <li>If endpoint is configured via via fs.s3a.endpoint, set it.
* If no region is configured, try to parse region from endpoint. </li>
* <li> If no region is configured, and it could not be parsed from the endpoint,
* set the default region as US_EAST_2 and enable cross region access. </li>
* <li> If configured region is empty, fallback to SDK resolution chain. </li>
* </ol>
*
* @param builder S3 client builder.
* @param parameters parameter object
* @param conf conf configuration object
* @param <BuilderT> S3 client builder type
* @param <ClientT> S3 client type
*/
private <BuilderT extends S3BaseClientBuilder<BuilderT, ClientT>, ClientT> void configureEndpointAndRegion(
BuilderT builder, S3ClientCreationParameters parameters, Configuration conf) {
URI endpoint = getS3Endpoint(parameters.getEndpoint(), conf);

String configuredRegion = parameters.getRegion();
Region region = null;
String origin = "";

// If the region was configured, set it.
if (configuredRegion != null && !configuredRegion.isEmpty()) {
origin = AWS_REGION;
region = Region.of(configuredRegion);
}

if (endpoint != null) {
builder.endpointOverride(endpoint);
// No region was configured, try to determine it from the endpoint.
if (region == null) {
region = getS3RegionFromEndpoint(parameters.getEndpoint());
if (region != null) {
origin = "endpoint";
}
}
LOG.debug("Setting endpoint to {}", endpoint);
}

if (region != null) {
builder.region(region);
} else if (configuredRegion == null) {
// no region is configured, and none could be determined from the endpoint.
// Use US_EAST_2 as default.
region = Region.of(AWS_S3_DEFAULT_REGION);
builder.crossRegionAccessEnabled(true);
builder.region(region);
origin = "cross region access fallback";
} else if (configuredRegion.isEmpty()) {
// region configuration was set to empty string.
// allow this if people really want it; it is OK to rely on this
// when deployed in EC2.
WARN_OF_DEFAULT_REGION_CHAIN.warn(SDK_REGION_CHAIN_IN_USE);
LOG.debug(SDK_REGION_CHAIN_IN_USE);
origin = "SDK region chain";
}

LOG.debug("Setting region to {} from {}", region, origin);
}

/**
* Given a endpoint string, create the endpoint URI.
*
Expand Down Expand Up @@ -229,4 +305,23 @@ private static URI getS3Endpoint(String endpoint, final Configuration conf) {
throw new IllegalArgumentException(e);
}
}

/**
* Parses the endpoint to get the region.
* If endpoint is the central one, use US_EAST_1.
*
* @param endpoint the configure endpoint.
* @return the S3 region, null if unable to resolve from endpoint.
*/
private static Region getS3RegionFromEndpoint(String endpoint) {

if(!endpoint.endsWith(CENTRAL_ENDPOINT)) {
LOG.debug("Endpoint {} is not the default; parsing", endpoint);
return AwsHostNameUtils.parseSigningRegion(endpoint, S3_SERVICE_NAME).orElse(null);
}

// endpoint is for US_EAST_1;
return Region.US_EAST_1;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.Collections;
import java.util.Date;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
Expand All @@ -54,7 +53,6 @@

import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
Expand Down Expand Up @@ -83,7 +81,6 @@
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.services.s3.model.S3Error;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.services.s3.model.S3Object;
import software.amazon.awssdk.services.s3.model.SelectObjectContentRequest;
import software.amazon.awssdk.services.s3.model.SelectObjectContentResponseHandler;
Expand All @@ -98,7 +95,6 @@
import software.amazon.awssdk.transfer.s3.model.FileUpload;
import software.amazon.awssdk.transfer.s3.model.UploadFileRequest;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -246,7 +242,6 @@
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.CSE_PADDING_LENGTH;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DELETE_CONSIDERED_IDEMPOTENT;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_301_MOVED_PERMANENTLY;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_403_FORBIDDEN;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404_NOT_FOUND;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT;
Expand Down Expand Up @@ -332,8 +327,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
private int executorCapacity;
private long multiPartThreshold;
public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class);
/** Exactly once log to warn about setting the region in config to avoid probe. */
private static final LogExactlyOnce SET_REGION_WARNING = new LogExactlyOnce(LOG);

/** Log to warn of storage class configuration problems. */
private static final LogExactlyOnce STORAGE_CLASS_WARNING = new LogExactlyOnce(LOG);
Expand Down Expand Up @@ -461,8 +454,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
*/
private String scheme = FS_S3A;

private final static Map<String, Region> BUCKET_REGIONS = new HashMap<>();

/** Add any deprecated keys. */
@SuppressWarnings("deprecation")
private static void addDeprecatedKeys() {
Expand Down Expand Up @@ -870,9 +861,6 @@ protected void verifyBucketExists() throws UnknownStoreException, IOException {
STORE_EXISTS_PROBE, bucket, null, () ->
invoker.retry("doesBucketExist", bucket, true, () -> {
try {
if (BUCKET_REGIONS.containsKey(bucket)) {
return true;
}
s3Client.headBucket(HeadBucketRequest.builder().bucket(bucket).build());
return true;
} catch (AwsServiceException ex) {
Expand Down Expand Up @@ -982,8 +970,6 @@ private void bindAWSClient(URI name, boolean dtEnabled) throws IOException {
? conf.getTrimmed(AWS_REGION)
: accessPoint.getRegion();

Region region = getS3Region(configuredRegion);

S3ClientFactory.S3ClientCreationParameters parameters =
new S3ClientFactory.S3ClientCreationParameters()
.withCredentialSet(credentials)
Expand All @@ -998,7 +984,7 @@ private void bindAWSClient(URI name, boolean dtEnabled) throws IOException {
.withMultipartCopyEnabled(isMultipartCopyEnabled)
.withMultipartThreshold(multiPartThreshold)
.withTransferManagerExecutor(unboundedThreadPool)
.withRegion(region);
.withRegion(configuredRegion);

S3ClientFactory clientFactory = ReflectionUtils.newInstance(s3ClientFactoryClass, conf);
s3Client = clientFactory.createS3Client(getUri(), parameters);
Expand All @@ -1019,75 +1005,6 @@ private void createS3AsyncClient(S3ClientFactory clientFactory,
s3AsyncClient = clientFactory.createS3AsyncClient(getUri(), parameters);
}

/**
* Get the bucket region.
*
* @param region AWS S3 Region set in the config. This property may not be set, in which case
* ask S3 for the region.
* @return region of the bucket.
*/
private Region getS3Region(String region) throws IOException {

if (!StringUtils.isBlank(region)) {
return Region.of(region);
}

Region cachedRegion = BUCKET_REGIONS.get(bucket);

if (cachedRegion != null) {
LOG.debug("Got region {} for bucket {} from cache", cachedRegion, bucket);
return cachedRegion;
}

Region s3Region = trackDurationAndSpan(STORE_REGION_PROBE, bucket, null,
() -> invoker.retry("getS3Region", bucket, true, () -> {
try {

SET_REGION_WARNING.warn(
"Getting region for bucket {} from S3, this will slow down FS initialisation. "
+ "To avoid this, set the region using property {}", bucket,
FS_S3A_BUCKET_PREFIX + bucket + ".endpoint.region");

// build a s3 client with region eu-west-1 that can be used to get the region of the
// bucket. Using eu-west-1, as headBucket() doesn't work with us-east-1. This is because
// us-east-1 uses the endpoint s3.amazonaws.com, which resolves bucket.s3.amazonaws.com
// to the actual region the bucket is in. As the request is signed with us-east-1 and
// not the bucket's region, it fails.
S3Client getRegionS3Client =
S3Client.builder().region(Region.EU_WEST_1).credentialsProvider(credentials)
.build();

HeadBucketResponse headBucketResponse =
getRegionS3Client.headBucket(HeadBucketRequest.builder().bucket(bucket).build());

Region bucketRegion = Region.of(
headBucketResponse.sdkHttpResponse().headers().get(BUCKET_REGION_HEADER).get(0));
BUCKET_REGIONS.put(bucket, bucketRegion);

return bucketRegion;
} catch (S3Exception exception) {
if (exception.statusCode() == SC_301_MOVED_PERMANENTLY) {
Region bucketRegion = Region.of(
exception.awsErrorDetails().sdkHttpResponse().headers().get(BUCKET_REGION_HEADER)
.get(0));
BUCKET_REGIONS.put(bucket, bucketRegion);

return bucketRegion;
}

if (exception.statusCode() == SC_404_NOT_FOUND) {
throw new UnknownStoreException("s3a://" + bucket + "/",
" Bucket does not exist: " + exception,
exception);
}

throw exception;
}
}));

return s3Region;
}

/**
* Initialize and launch the audit manager and service.
* As this takes the FS IOStatistics store, it must be invoked
Expand Down
Loading

0 comments on commit e0563fe

Please sign in to comment.