From ef7fb64764a9b9c77b0dbda48285f186cfcf5c4f Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Mon, 6 Nov 2023 16:00:56 +0000 Subject: [PATCH] HADOOP-18925. S3A: option to enable/disable CopyFromLocalOperation (#6163) Add a new option: fs.s3a.optimized.copy.from.local.enabled This will enable (default) or disable the optimized CopyFromLocalOperation upload operation when copyFromLocalFile() is invoked. When false the superclass implementation is used; duration statistics are still collected, though audit span entries in logs will be for the individual fs operations, not the overall operation. Contributed by Steve Loughran --- .../org/apache/hadoop/fs/s3a/Constants.java | 13 +++ .../apache/hadoop/fs/s3a/S3AFileSystem.java | 93 +++++++++++++------ .../tools/hadoop-aws/troubleshooting_s3a.md | 10 ++ .../fs/s3a/ITestS3ACopyFromLocalFile.java | 51 ++++++++++ 4 files changed, 138 insertions(+), 29 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index f4aeccf1efd10..36c98f2353bea 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -1347,4 +1347,17 @@ private Constants() { */ public static final boolean DIRECTORY_OPERATIONS_PURGE_UPLOADS_DEFAULT = false; + + /** + * Is the higher performance copy from local file to S3 enabled? + * This switch allows for it to be disabled if there are problems. + * Value: {@value}. + */ + public static final String OPTIMIZED_COPY_FROM_LOCAL = "fs.s3a.optimized.copy.from.local.enabled"; + + /** + * Default value for {@link #OPTIMIZED_COPY_FROM_LOCAL}. + * Value: {@value}. + */ + public static final boolean OPTIMIZED_COPY_FROM_LOCAL_DEFAULT = true; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index f96a378b1cc92..b978707ce00ec 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -462,6 +462,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, */ private String scheme = FS_S3A; + /** + * Flag to indicate that the higher performance copyFromLocalFile implementation + * should be used. + */ + private boolean optimizedCopyFromLocal; + /** Add any deprecated keys. */ @SuppressWarnings("deprecation") private static void addDeprecatedKeys() { @@ -696,6 +702,9 @@ public void initialize(URI name, Configuration originalConf) AWS_S3_VECTOR_ACTIVE_RANGE_READS, DEFAULT_AWS_S3_VECTOR_ACTIVE_RANGE_READS, 1); vectoredIOContext = populateVectoredIOContext(conf); scheme = (this.uri != null && this.uri.getScheme() != null) ? this.uri.getScheme() : FS_S3A; + optimizedCopyFromLocal = conf.getBoolean(OPTIMIZED_COPY_FROM_LOCAL, + OPTIMIZED_COPY_FROM_LOCAL_DEFAULT); + LOG.debug("Using optimized copyFromLocal implementation: {}", optimizedCopyFromLocal); } catch (SdkException e) { // amazon client exception: stop all services then throw the translation cleanupWithLogger(LOG, span); @@ -4021,9 +4030,9 @@ private boolean s3Exists(final Path path, final Set probes) * the given dst name. * * This version doesn't need to create a temporary file to calculate the md5. - * Sadly this doesn't seem to be used by the shell cp :( + * If {@link Constants#OPTIMIZED_COPY_FROM_LOCAL} is set to false, + * the superclass implementation is used. * - * delSrc indicates if the source should be removed * @param delSrc whether to delete the src * @param overwrite whether to overwrite an existing file * @param src path @@ -4031,35 +4040,59 @@ private boolean s3Exists(final Path path, final Set probes) * @throws IOException IO problem * @throws FileAlreadyExistsException the destination file exists and * overwrite==false - * @throws SdkException failure in the AWS SDK */ @Override @AuditEntryPoint public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst) throws IOException { checkNotClosed(); - LOG.debug("Copying local file from {} to {}", src, dst); - trackDurationAndSpan(INVOCATION_COPY_FROM_LOCAL_FILE, dst, - () -> new CopyFromLocalOperation( - createStoreContext(), - src, - dst, - delSrc, - overwrite, - createCopyFromLocalCallbacks()).execute()); + LOG.debug("Copying local file from {} to {} (delSrc={} overwrite={}", + src, dst, delSrc, overwrite); + if (optimizedCopyFromLocal) { + trackDurationAndSpan(INVOCATION_COPY_FROM_LOCAL_FILE, dst, () -> + new CopyFromLocalOperation( + createStoreContext(), + src, + dst, + delSrc, + overwrite, + createCopyFromLocalCallbacks(getActiveAuditSpan())) + .execute()); + } else { + // call the superclass, but still count statistics. + // there is no overall span here, as each FS API call will + // be in its own span. + LOG.debug("Using base copyFromLocalFile implementation"); + trackDurationAndSpan(INVOCATION_COPY_FROM_LOCAL_FILE, dst, () -> { + super.copyFromLocalFile(delSrc, overwrite, src, dst); + return null; + }); + } } + /** + * Create the CopyFromLocalCallbacks; + * protected to assist in mocking. + * @param span audit span. + * @return the callbacks + * @throws IOException failure to get the local fs. + */ protected CopyFromLocalOperation.CopyFromLocalOperationCallbacks - createCopyFromLocalCallbacks() throws IOException { + createCopyFromLocalCallbacks(final AuditSpanS3A span) throws IOException { LocalFileSystem local = getLocal(getConf()); - return new CopyFromLocalCallbacksImpl(local); + return new CopyFromLocalCallbacksImpl(span, local); } protected final class CopyFromLocalCallbacksImpl implements CopyFromLocalOperation.CopyFromLocalOperationCallbacks { + + /** Span to use for all operations. */ + private final AuditSpanS3A span; private final LocalFileSystem local; - private CopyFromLocalCallbacksImpl(LocalFileSystem local) { + private CopyFromLocalCallbacksImpl(final AuditSpanS3A span, + LocalFileSystem local) { + this.span = span; this.local = local; } @@ -4081,20 +4114,18 @@ public boolean deleteLocal(Path path, boolean recursive) throws IOException { @Override public void copyLocalFileFromTo(File file, Path from, Path to) throws IOException { - trackDurationAndSpan( - OBJECT_PUT_REQUESTS, - to, - () -> { - final String key = pathToKey(to); - Progressable progress = null; - PutObjectRequest.Builder putObjectRequestBuilder = - newPutObjectRequestBuilder(key, file.length(), false); - S3AFileSystem.this.invoker.retry("putObject(" + "" + ")", to.toString(), true, - () -> executePut(putObjectRequestBuilder.build(), progress, putOptionsForPath(to), - file)); - - return null; - }); + // the duration of the put is measured, but the active span is the + // constructor-supplied one -this ensures all audit log events are grouped correctly + span.activate(); + trackDuration(getDurationTrackerFactory(), OBJECT_PUT_REQUESTS.getSymbol(), () -> { + final String key = pathToKey(to); + PutObjectRequest.Builder putObjectRequestBuilder = + newPutObjectRequestBuilder(key, file.length(), false); + final String dest = to.toString(); + S3AFileSystem.this.invoker.retry("putObject(" + dest + ")", dest, true, () -> + executePut(putObjectRequestBuilder.build(), null, putOptionsForPath(to), file)); + return null; + }); } @Override @@ -5399,6 +5430,10 @@ public boolean hasPathCapability(final Path path, final String capability) case FS_S3A_CREATE_PERFORMANCE_ENABLED: return performanceCreation; + // is the optimized copy from local enabled. + case OPTIMIZED_COPY_FROM_LOCAL: + return optimizedCopyFromLocal; + default: return super.hasPathCapability(p, cap); } diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md index 8c57db1faf4c1..be359336463ad 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md @@ -1544,3 +1544,13 @@ software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP When this happens, try to set `fs.s3a.connection.request.timeout` to a larger value or disable it completely by setting it to `0`. + +### Debugging Switches + +There are some switches which can be set to enable/disable features and assist +in isolating problems and at least make them "go away". + + +| Key | Default | Action | +|------|---------|----------| +| `fs.s3a.optimized.copy.from.local.enabled` | `true` | [HADOOP-18925](https://issues.apache.org/jira/browse/HADOOP-18925) enable/disable CopyFromLocalOperation. Also a path capability. | diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ACopyFromLocalFile.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ACopyFromLocalFile.java index dfac771dd78dc..f9600de6d20cf 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ACopyFromLocalFile.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ACopyFromLocalFile.java @@ -19,6 +19,8 @@ package org.apache.hadoop.fs.s3a; import java.io.File; +import java.util.Arrays; +import java.util.Collection; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.contract.AbstractContractCopyFromLocalTest; @@ -26,18 +28,67 @@ import org.apache.hadoop.fs.contract.s3a.S3AContract; import org.apache.hadoop.fs.Path; + +import org.assertj.core.api.Assertions; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import static org.apache.hadoop.fs.s3a.Constants.OPTIMIZED_COPY_FROM_LOCAL; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; import static org.apache.hadoop.test.LambdaTestUtils.intercept; +/** + * Test copying files from the local filesystem to S3A. + * Parameterized on whether or not the optimized + * copyFromLocalFile is enabled. + */ +@RunWith(Parameterized.class) public class ITestS3ACopyFromLocalFile extends AbstractContractCopyFromLocalTest { + /** + * Parameterization. + */ + @Parameterized.Parameters(name = "enabled={0}") + public static Collection params() { + return Arrays.asList(new Object[][]{ + {true}, + {false}, + }); + } + private final boolean enabled; + + public ITestS3ACopyFromLocalFile(final boolean enabled) { + this.enabled = enabled; + } + + @Override + protected Configuration createConfiguration() { + final Configuration conf = super.createConfiguration(); + + removeBaseAndBucketOverrides(getTestBucketName(conf), conf, + OPTIMIZED_COPY_FROM_LOCAL); + conf.setBoolean(OPTIMIZED_COPY_FROM_LOCAL, enabled); + disableFilesystemCaching(conf); + return conf; + } @Override protected AbstractFSContract createContract(Configuration conf) { return new S3AContract(conf); } + @Test + public void testOptionPropagation() throws Throwable { + Assertions.assertThat(getFileSystem().hasPathCapability(new Path("/"), + OPTIMIZED_COPY_FROM_LOCAL)) + .describedAs("path capability of %s", OPTIMIZED_COPY_FROM_LOCAL) + .isEqualTo(enabled); + + } + @Test public void testLocalFilesOnly() throws Throwable { describe("Copying into other file systems must fail");