Skip to content

Commit

Permalink
HADOOP-18930. Make fs.s3a.create.performance a bucket-wide setting. (#…
Browse files Browse the repository at this point in the history
…6168)


If fs.s3a.create.performance is set on a bucket
- All file overwrite checks are skipped, even if the caller says
  otherwise.
- All directory existence checks are skipped.
- Marker deletion is *always* skipped.

This eliminates a HEAD and a LIST for every creation.

* New path capability "fs.s3a.create.performance.enabled" true
  if the option is enabled.
* Parameterize ITestS3AContractCreate to expect the different
  outcomes
* Parameterize ITestCreateFileCost similarly, with
  changed cost assertions there.
* create(/) raises an IOE. existing bug only noticed here.

Contributed by Steve Loughran
  • Loading branch information
steveloughran authored Oct 27, 2023
1 parent 93a3c6e commit 7ec636d
Show file tree
Hide file tree
Showing 14 changed files with 356 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,12 @@ be used as evidence at the inquest as proof that they made a
conscious decision to choose speed over safety and
that the outcome was their own fault.

Note: the option can be set for an entire filesystem. Again, the safety checks
are there to more closely match the semantics of a classic filesystem,
and to reduce the likelihood that the object store ends up in a state which
diverges so much from the classic directory + tree structur that applications
get confused.

Accordingly: *Use if and only if you are confident that the conditions are met.*

### `fs.s3a.create.header` User-supplied header support
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1192,12 +1192,26 @@ private Constants() {

/**
* Flag for create performance.
* This is *not* a configuration option; it is for use in the
* {code createFile()} builder.
* This can be set in the {code createFile()} builder.
* Value {@value}.
*/
public static final String FS_S3A_CREATE_PERFORMANCE = "fs.s3a.create.performance";

/**
* Default value for create performance in an S3A FS.
* Value {@value}.
*/
public static final boolean FS_S3A_CREATE_PERFORMANCE_DEFAULT = true;


/**
* Capability to indicate that the FS has been instantiated with
* {@link #FS_S3A_CREATE_PERFORMANCE} set to true.
* Value {@value}.
*/
public static final String FS_S3A_CREATE_PERFORMANCE_ENABLED =
FS_S3A_CREATE_PERFORMANCE + ".enabled";

/**
* Prefix for adding a header to the object when created.
* The actual value must have a "." suffix and then the actual header.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
import static org.apache.hadoop.fs.s3a.impl.CreateFileBuilder.OPTIONS_CREATE_FILE_NO_OVERWRITE;
import static org.apache.hadoop.fs.s3a.impl.CreateFileBuilder.OPTIONS_CREATE_FILE_OVERWRITE;
import static org.apache.hadoop.fs.s3a.impl.CreateFileBuilder.OPTIONS_CREATE_FILE_PERFORMANCE;
import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isObjectNotFound;
import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isUnknownBucket;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.AP_REQUIRED_EXCEPTION;
Expand Down Expand Up @@ -348,7 +349,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
private S3AStatisticsContext statisticsContext;
/** Storage Statistics Bonded to the instrumentation. */
private S3AStorageStatistics storageStatistics;

/** Should all create files be "performance" unless unset. */
private boolean performanceCreation;
/**
* Default input policy; may be overridden in
* {@code openFile()}.
Expand Down Expand Up @@ -660,6 +662,11 @@ public void initialize(URI name, Configuration originalConf)
// verify there's no S3Guard in the store config.
checkNoS3Guard(this.getUri(), getConf());

// performance creation flag for code which wants performance
// at the risk of overwrites.
performanceCreation = conf.getBoolean(FS_S3A_CREATE_PERFORMANCE,
FS_S3A_CREATE_PERFORMANCE_DEFAULT);
LOG.debug("{} = {}", FS_S3A_CREATE_PERFORMANCE, performanceCreation);
allowAuthoritativePaths = S3Guard.getAuthoritativePaths(this);

// directory policy, which may look at authoritative paths
Expand Down Expand Up @@ -1878,14 +1885,22 @@ public FSDataOutputStream create(Path f, FsPermission permission,
Progressable progress) throws IOException {
final Path path = qualify(f);

// work out the options to pass down
CreateFileBuilder.CreateFileOptions options;
if (performanceCreation) {
options = OPTIONS_CREATE_FILE_PERFORMANCE;
}else {
options = overwrite
? OPTIONS_CREATE_FILE_OVERWRITE
: OPTIONS_CREATE_FILE_NO_OVERWRITE;
}

// the span will be picked up inside the output stream
return trackDurationAndSpan(INVOCATION_CREATE, path, () ->
innerCreateFile(path,
progress,
getActiveAuditSpan(),
overwrite
? OPTIONS_CREATE_FILE_OVERWRITE
: OPTIONS_CREATE_FILE_NO_OVERWRITE));
options));
}

/**
Expand All @@ -1912,14 +1927,19 @@ private FSDataOutputStream innerCreateFile(
final CreateFileBuilder.CreateFileOptions options) throws IOException {
auditSpan.activate();
String key = pathToKey(path);
if (key.isEmpty()) {
// no matter the creation options, root cannot be written to.
throw new PathIOException("/", "Can't create root path");
}
EnumSet<CreateFlag> flags = options.getFlags();
boolean overwrite = flags.contains(CreateFlag.OVERWRITE);
boolean performance = options.isPerformance();
boolean skipProbes = performance || isUnderMagicCommitPath(path);

boolean skipProbes = options.isPerformance() || isUnderMagicCommitPath(path);
if (skipProbes) {
LOG.debug("Skipping existence/overwrite checks");
} else {
try {
boolean overwrite = flags.contains(CreateFlag.OVERWRITE);

// get the status or throw an FNFE.
// when overwriting, there is no need to look for any existing file,
// just a directory (for safety)
Expand Down Expand Up @@ -1951,7 +1971,7 @@ private FSDataOutputStream innerCreateFile(

// put options are derived from the path and the
// option builder.
boolean keep = performance || keepDirectoryMarkers(path);
boolean keep = options.isPerformance() || keepDirectoryMarkers(path);
final PutObjectOptions putOptions =
new PutObjectOptions(keep, null, options.getHeaders());

Expand Down Expand Up @@ -2034,11 +2054,14 @@ public FSDataOutputStreamBuilder createFile(final Path path) {
final AuditSpan span = entryPoint(INVOCATION_CREATE_FILE,
pathToKey(qualified),
null);
return new CreateFileBuilder(this,
final CreateFileBuilder builder = new CreateFileBuilder(this,
qualified,
new CreateFileBuilderCallbacksImpl(INVOCATION_CREATE_FILE, span))
.create()
.overwrite(true);
new CreateFileBuilderCallbacksImpl(INVOCATION_CREATE_FILE, span));
builder
.create()
.overwrite(true)
.must(FS_S3A_CREATE_PERFORMANCE, performanceCreation);
return builder;
} catch (IOException e) {
// catch any IOEs raised in span creation and convert to
// an UncheckedIOException
Expand Down Expand Up @@ -2101,7 +2124,8 @@ public FSDataOutputStream createNonRecursive(Path p,
.create()
.withFlags(flags)
.blockSize(blockSize)
.bufferSize(bufferSize);
.bufferSize(bufferSize)
.must(FS_S3A_CREATE_PERFORMANCE, performanceCreation);
if (progress != null) {
builder.progress(progress);
}
Expand Down Expand Up @@ -5371,6 +5395,10 @@ public boolean hasPathCapability(final Path path, final String capability)
case FS_S3A_CREATE_HEADER:
return true;

// is the FS configured for create file performance
case FS_S3A_CREATE_PERFORMANCE_ENABLED:
return performanceCreation;

default:
return super.hasPathCapability(p, cap);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ public class CreateFileBuilder extends
public static final CreateFileOptions OPTIONS_CREATE_FILE_NO_OVERWRITE =
new CreateFileOptions(CREATE_NO_OVERWRITE_FLAGS, true, false, null);

/**
* Performance create options.
*/
public static final CreateFileOptions OPTIONS_CREATE_FILE_PERFORMANCE =
new CreateFileOptions(CREATE_OVERWRITE_FLAGS, true, true, null);

/**
* Callback interface.
*/
Expand Down Expand Up @@ -129,6 +135,7 @@ public FSDataOutputStream build() throws IOException {
if (flags.contains(CreateFlag.APPEND)) {
throw new UnsupportedOperationException("Append is not supported");
}

if (!flags.contains(CreateFlag.CREATE) &&
!flags.contains(CreateFlag.OVERWRITE)) {
throw new PathIOException(path.toString(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,111 @@

package org.apache.hadoop.fs.contract.s3a;

import java.util.Arrays;
import java.util.Collection;

import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractCreateTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.s3a.S3ATestUtils;

import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;

/**
* S3A contract tests creating files.
* Parameterized on the create performance flag as all overwrite
* tests are required to fail in create performance mode.
*/
@RunWith(Parameterized.class)
public class ITestS3AContractCreate extends AbstractContractCreateTest {

/**
* This test suite is parameterized for the different create file
* options.
* @return a list of test parameters.
*/
@Parameterized.Parameters
public static Collection<Object[]> params() {
return Arrays.asList(new Object[][]{
{false},
{true}
});
}

/**
* Is this test run in create performance mode?
*/
private final boolean createPerformance;

public ITestS3AContractCreate(final boolean createPerformance) {
this.createPerformance = createPerformance;
}

@Override
protected AbstractFSContract createContract(Configuration conf) {
return new S3AContract(conf);
}

@Override
protected Configuration createConfiguration() {
final Configuration conf = super.createConfiguration();
removeBaseAndBucketOverrides(conf,
FS_S3A_CREATE_PERFORMANCE);
conf.setBoolean(FS_S3A_CREATE_PERFORMANCE, createPerformance);
S3ATestUtils.disableFilesystemCaching(conf);
return conf;
}

@Override
public void testOverwriteNonEmptyDirectory() throws Throwable {
try {
super.testOverwriteNonEmptyDirectory();
failWithCreatePerformance();
} catch (AssertionError e) {
swallowWithCreatePerformance(e);
}
}

@Override
public void testOverwriteEmptyDirectory() throws Throwable {
try {
super.testOverwriteEmptyDirectory();
failWithCreatePerformance();
} catch (AssertionError e) {
swallowWithCreatePerformance(e);
}
}

@Override
public void testCreateFileOverExistingFileNoOverwrite() throws Throwable {
try {
super.testCreateFileOverExistingFileNoOverwrite();
failWithCreatePerformance();
} catch (AssertionError e) {
swallowWithCreatePerformance(e);
}
}

private void failWithCreatePerformance() {
if (createPerformance) {
fail("expected an assertion error in create performance mode");
}
}

/**
* Swallow an assertion error if the create performance flag is set.
* @param e assertion error
*/
private void swallowWithCreatePerformance(final AssertionError e) {
// this is expected in create performance modea
if (!createPerformance) {
// but if the create performance flag is set, then it is supported
// and the assertion error is unexpected
throw e;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package org.apache.hadoop.fs.s3a;

import java.io.IOException;

import org.assertj.core.api.Assertions;
import org.junit.Ignore;

import org.apache.hadoop.conf.Configuration;
Expand All @@ -27,6 +30,7 @@
import org.apache.hadoop.fs.contract.s3a.S3AContract;

import static org.apache.hadoop.fs.s3a.S3ATestUtils.createTestPath;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.isCreatePerformanceEnabled;

/**
* S3A Test suite for the FSMainOperationsBaseTest tests.
Expand Down Expand Up @@ -70,4 +74,20 @@ public void testCopyToLocalWithUseRawLocalFileSystemOption()
throws Exception {
}

@Override
public void testOverwrite() throws IOException {
boolean createPerformance = isCreatePerformanceEnabled(fSys);
try {
super.testOverwrite();
Assertions.assertThat(createPerformance)
.describedAs("create performance enabled")
.isFalse();
} catch (AssertionError e) {
// swallow the exception if create performance is enabled,
// else rethrow
if (!createPerformance) {
throw e;
}
}
}
}
Loading

0 comments on commit 7ec636d

Please sign in to comment.