Skip to content

Commit

Permalink
HADOOP-19102. [ABFS] FooterReadBufferSize should not be greater than …
Browse files Browse the repository at this point in the history
…readBufferSize (#6617)


Contributed by  Pranav Saxena
  • Loading branch information
saxenapranav authored Apr 22, 2024
1 parent eec9cd2 commit 6404692
Show file tree
Hide file tree
Showing 10 changed files with 746 additions and 270 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand All @@ -34,6 +38,9 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSBuilder;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Future IO Helper methods.
* <p>
Expand All @@ -55,6 +62,7 @@
@InterfaceStability.Unstable
public final class FutureIO {

private static final Logger LOG = LoggerFactory.getLogger(FutureIO.class.getName());
private FutureIO() {
}

Expand Down Expand Up @@ -114,6 +122,77 @@ public static <T> T awaitFuture(final Future<T> future,
}
}

/**
* Evaluates a collection of futures and returns their results as a list.
* <p>
* This method blocks until all futures in the collection have completed.
* If any future throws an exception during its execution, this method
* extracts and rethrows that exception.
* </p>
*
* @param collection collection of futures to be evaluated
* @param <T> type of the result.
* @return the list of future's result, if all went well.
* @throws InterruptedIOException future was interrupted
* @throws IOException if something went wrong
* @throws RuntimeException any nested RTE thrown
*/
public static <T> List<T> awaitAllFutures(final Collection<Future<T>> collection)
throws InterruptedIOException, IOException, RuntimeException {
List<T> results = new ArrayList<>();
try {
for (Future<T> future : collection) {
results.add(future.get());
}
return results;
} catch (InterruptedException e) {
LOG.debug("Execution of future interrupted ", e);
throw (InterruptedIOException) new InterruptedIOException(e.toString())
.initCause(e);
} catch (ExecutionException e) {
LOG.debug("Execution of future failed with exception", e.getCause());
return raiseInnerCause(e);
}
}

/**
* Evaluates a collection of futures and returns their results as a list,
* but only waits up to the specified timeout for each future to complete.
* <p>
* This method blocks until all futures in the collection have completed or
* the timeout expires, whichever happens first. If any future throws an
* exception during its execution, this method extracts and rethrows that exception.
* </p>
*
* @param collection collection of futures to be evaluated
* @param duration timeout duration
* @param <T> type of the result.
* @return the list of future's result, if all went well.
* @throws InterruptedIOException future was interrupted
* @throws IOException if something went wrong
* @throws RuntimeException any nested RTE thrown
* @throws TimeoutException the future timed out.
*/
public static <T> List<T> awaitAllFutures(final Collection<Future<T>> collection,
final Duration duration)
throws InterruptedIOException, IOException, RuntimeException,
TimeoutException {
List<T> results = new ArrayList<>();
try {
for (Future<T> future : collection) {
results.add(future.get(duration.toMillis(), TimeUnit.MILLISECONDS));
}
return results;
} catch (InterruptedException e) {
LOG.debug("Execution of future interrupted ", e);
throw (InterruptedIOException) new InterruptedIOException(e.toString())
.initCause(e);
} catch (ExecutionException e) {
LOG.debug("Execution of future failed with exception", e.getCause());
return raiseInnerCause(e);
}
}

/**
* From the inner cause of an execution exception, extract the inner cause
* if it is an IOE or RTE.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1230,11 +1230,6 @@ public void setOptimizeFooterRead(boolean optimizeFooterRead) {
this.optimizeFooterRead = optimizeFooterRead;
}

@VisibleForTesting
public void setFooterReadBufferSize(int footerReadBufferSize) {
this.footerReadBufferSize = footerReadBufferSize;
}

@VisibleForTesting
public void setEnableAbfsListIterator(boolean enableAbfsListIterator) {
this.enableAbfsListIterator = enableAbfsListIterator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ private FSDataInputStream open(final Path path,
try {
TracingContext tracingContext = new TracingContext(clientCorrelationId,
fileSystemId, FSOperationType.OPEN, tracingHeaderFormat, listener);
InputStream inputStream = abfsStore
InputStream inputStream = getAbfsStore()
.openFileForRead(qualifiedPath, parameters, statistics, tracingContext);
return new FSDataInputStream(inputStream);
} catch (AzureBlobFileSystemException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -898,21 +898,21 @@ private AbfsInputStreamContext populateAbfsInputStreamContext(
.map(c -> c.getBoolean(FS_AZURE_BUFFERED_PREAD_DISABLE, false))
.orElse(false);
int footerReadBufferSize = options.map(c -> c.getInt(
AZURE_FOOTER_READ_BUFFER_SIZE, abfsConfiguration.getFooterReadBufferSize()))
.orElse(abfsConfiguration.getFooterReadBufferSize());
return new AbfsInputStreamContext(abfsConfiguration.getSasTokenRenewPeriodForStreamsInSeconds())
.withReadBufferSize(abfsConfiguration.getReadBufferSize())
.withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth())
.withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends())
.isReadAheadEnabled(abfsConfiguration.isReadAheadEnabled())
.withReadSmallFilesCompletely(abfsConfiguration.readSmallFilesCompletely())
.withOptimizeFooterRead(abfsConfiguration.optimizeFooterRead())
AZURE_FOOTER_READ_BUFFER_SIZE, getAbfsConfiguration().getFooterReadBufferSize()))
.orElse(getAbfsConfiguration().getFooterReadBufferSize());
return new AbfsInputStreamContext(getAbfsConfiguration().getSasTokenRenewPeriodForStreamsInSeconds())
.withReadBufferSize(getAbfsConfiguration().getReadBufferSize())
.withReadAheadQueueDepth(getAbfsConfiguration().getReadAheadQueueDepth())
.withTolerateOobAppends(getAbfsConfiguration().getTolerateOobAppends())
.isReadAheadEnabled(getAbfsConfiguration().isReadAheadEnabled())
.withReadSmallFilesCompletely(getAbfsConfiguration().readSmallFilesCompletely())
.withOptimizeFooterRead(getAbfsConfiguration().optimizeFooterRead())
.withFooterReadBufferSize(footerReadBufferSize)
.withReadAheadRange(abfsConfiguration.getReadAheadRange())
.withReadAheadRange(getAbfsConfiguration().getReadAheadRange())
.withStreamStatistics(new AbfsInputStreamStatisticsImpl())
.withShouldReadBufferSizeAlways(
abfsConfiguration.shouldReadBufferSizeAlways())
.withReadAheadBlockSize(abfsConfiguration.getReadAheadBlockSize())
getAbfsConfiguration().shouldReadBufferSizeAlways())
.withReadAheadBlockSize(getAbfsConfiguration().getReadAheadBlockSize())
.withBufferedPreadDisabled(bufferedPreadDisabled)
.withEncryptionAdapter(contextEncryptionAdapter)
.withAbfsBackRef(fsBackRef)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,10 @@ public AbfsInputStream(
this.path = path;
this.contentLength = contentLength;
this.bufferSize = abfsInputStreamContext.getReadBufferSize();
this.footerReadSize = abfsInputStreamContext.getFooterReadBufferSize();
/*
* FooterReadSize should not be more than bufferSize.
*/
this.footerReadSize = Math.min(bufferSize, abfsInputStreamContext.getFooterReadBufferSize());
this.readAheadQueueDepth = abfsInputStreamContext.getReadAheadQueueDepth();
this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends();
this.eTag = eTag;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public abstract class AbstractAbfsIntegrationTest extends
private AuthType authType;
private boolean useConfiguredFileSystem = false;
private boolean usingFilesystemForSASTests = false;
private static final int SHORTENED_GUID_LEN = 12;
public static final int SHORTENED_GUID_LEN = 12;

protected AbstractAbfsIntegrationTest() throws Exception {
fileSystemName = TEST_CONTAINER_PREFIX + UUID.randomUUID().toString();
Expand Down Expand Up @@ -366,6 +366,14 @@ public AbfsConfiguration getConfiguration() {
return abfsConfig;
}

public AbfsConfiguration getConfiguration(AzureBlobFileSystem fs) {
return fs.getAbfsStore().getAbfsConfiguration();
}

public Map<String, Long> getInstrumentationMap(AzureBlobFileSystem fs) {
return fs.getInstrumentationMap();
}

public Configuration getRawConfiguration() {
return abfsConfig.getRawConfiguration();
}
Expand Down
Loading

0 comments on commit 6404692

Please sign in to comment.