diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index c0e530cb5ce40..14fcba90ac130 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -152,6 +152,10 @@ import org.apache.hadoop.fs.s3a.impl.UploadContentProviders; import org.apache.hadoop.fs.s3a.impl.CSEUtils; import org.apache.hadoop.fs.s3a.prefetch.S3APrefetchingInputStream; +import org.apache.hadoop.fs.s3a.streams.ClassicInputStreamFactory; +import org.apache.hadoop.fs.s3a.streams.FactoryStreamParameters; +import org.apache.hadoop.fs.s3a.streams.InputStreamFactory; +import org.apache.hadoop.fs.s3a.streams.StreamReadCallbacks; import org.apache.hadoop.fs.s3a.tools.MarkerToolOperations; import org.apache.hadoop.fs.s3a.tools.MarkerToolOperationsImpl; import org.apache.hadoop.fs.statistics.DurationTracker; @@ -305,9 +309,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, private String username; - /** - * Store back end. - */ private S3AStore store; /** @@ -357,7 +358,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, /** Log to warn of storage class configuration problems. */ private static final LogExactlyOnce STORAGE_CLASS_WARNING = new LogExactlyOnce(LOG); - private LocalDirAllocator directoryAllocator; private String cannedACL; /** @@ -803,12 +803,12 @@ public void initialize(URI name, Configuration originalConf) s3AccessGrantsEnabled = conf.getBoolean(AWS_S3_ACCESS_GRANTS_ENABLED, false); int rateLimitCapacity = intOption(conf, S3A_IO_RATE_LIMIT, DEFAULT_S3A_IO_RATE_LIMIT, 0); - // now create the store + // now create and initialize the store store = createS3AStore(clientManager, rateLimitCapacity); // the s3 client is created through the store, rather than // directly through the client manager. // this is to aid mocking. - s3Client = store.getOrCreateS3Client(); + s3Client = getStore().getOrCreateS3Client(); // The filesystem is now ready to perform operations against // S3 // This initiates a probe against S3 for the bucket existing. @@ -851,7 +851,7 @@ private S3AFileSystemOperations createFileSystemHandler() { /** - * Create the S3AStore instance. + * Create and start the S3AStore instance. * This is protected so that tests can override it. * @param clientManager client manager * @param rateLimitCapacity rate limit @@ -860,7 +860,7 @@ private S3AFileSystemOperations createFileSystemHandler() { @VisibleForTesting protected S3AStore createS3AStore(final ClientManager clientManager, final int rateLimitCapacity) { - return new S3AStoreBuilder() + final S3AStore st = new S3AStoreBuilder() .withAuditSpanSource(getAuditManager()) .withClientManager(clientManager) .withDurationTrackerFactory(getDurationTrackerFactory()) @@ -872,6 +872,9 @@ protected S3AStore createS3AStore(final ClientManager clientManager, .withReadRateLimiter(unlimitedRate()) .withWriteRateLimiter(RateLimitingFactory.create(rateLimitCapacity)) .build(); + st.init(getConf()); + st.start(); + return st; } /** @@ -1344,6 +1347,15 @@ public FlagSet getPerformanceFlags() { return performanceFlags; } + + /** + * Get the store for low-level operations. + * @return the store the S3A FS is working through. + */ + private S3AStore getStore() { + return store; + } + /** * Implementation of all operations used by delegation tokens. */ @@ -1549,7 +1561,7 @@ public S3Client getAmazonS3Client(String reason) { @Override public S3AStore getStore() { - return store; + return S3AFileSystem.this.getStore(); } /** @@ -1678,28 +1690,8 @@ public S3AEncryptionMethods getS3EncryptionAlgorithm() { */ File createTmpFileForWrite(String pathStr, long size, Configuration conf) throws IOException { - initLocalDirAllocatorIfNotInitialized(conf); - Path path = directoryAllocator.getLocalPathForWrite(pathStr, - size, conf); - File dir = new File(path.getParent().toUri().getPath()); - String prefix = path.getName(); - // create a temp file on this directory - return File.createTempFile(prefix, null, dir); - } - /** - * Initialize dir allocator if not already initialized. - * - * @param conf The Configuration object. - */ - private void initLocalDirAllocatorIfNotInitialized(Configuration conf) { - if (directoryAllocator == null) { - synchronized (this) { - String bufferDir = conf.get(BUFFER_DIR) != null - ? BUFFER_DIR : HADOOP_TMP_DIR; - directoryAllocator = new LocalDirAllocator(bufferDir); - } - } + return getS3AInternals().getStore().createTemporaryFileForWriting(pathStr, size, conf); } /** @@ -1894,7 +1886,7 @@ private FSDataInputStream executeOpen( if (this.prefetchEnabled) { Configuration configuration = getConf(); - initLocalDirAllocatorIfNotInitialized(configuration); + return new FSDataInputStream( new S3APrefetchingInputStream( readContext.build(), @@ -1902,19 +1894,30 @@ private FSDataInputStream executeOpen( createInputStreamCallbacks(auditSpan), inputStreamStats, configuration, - directoryAllocator)); + getStore().getDirectoryAllocator())); } else { + + // create the factory. + // TODO: move into S3AStore and export the factory API through + // the store, which will add some of the features (callbacks, stats) + // before invoking the real factory + InputStreamFactory factory = new ClassicInputStreamFactory(); + factory.init(getConf()); + factory.start(); + FactoryStreamParameters parameters = new FactoryStreamParameters() + .withCallbacks(createInputStreamCallbacks(auditSpan)) + .withObjectAttributes(createObjectAttributes(path, fileStatus)) + .withContext(readContext.build()) + .withStreamStatistics(inputStreamStats) + .withBoundedThreadPool(new SemaphoredDelegatingExecutor( + boundedThreadPool, + vectoredActiveRangeReads, + true, + inputStreamStats)) + .build(); + return new FSDataInputStream( - new S3AInputStream( - readContext.build(), - createObjectAttributes(path, fileStatus), - createInputStreamCallbacks(auditSpan), - inputStreamStats, - new SemaphoredDelegatingExecutor( - boundedThreadPool, - vectoredActiveRangeReads, - true, - inputStreamStats))); + factory.create(parameters)); } } @@ -1922,7 +1925,7 @@ private FSDataInputStream executeOpen( * Override point: create the callbacks for S3AInputStream. * @return an implementation of the InputStreamCallbacks, */ - private S3AInputStream.InputStreamCallbacks createInputStreamCallbacks( + private StreamReadCallbacks createInputStreamCallbacks( final AuditSpan auditSpan) { return new InputStreamCallbacksImpl(auditSpan); } @@ -1931,7 +1934,7 @@ private S3AInputStream.InputStreamCallbacks createInputStreamCallbacks( * Operations needed by S3AInputStream to read data. */ private final class InputStreamCallbacksImpl implements - S3AInputStream.InputStreamCallbacks { + StreamReadCallbacks { /** * Audit span to activate before each call. @@ -1967,7 +1970,7 @@ public ResponseInputStream getObject(GetObjectRequest request IOException { // active the audit span used for the operation try (AuditSpan span = auditSpan.activate()) { - return fsHandler.getObject(store, request, getRequestFactory()); + return fsHandler.getObject(getStore(), request, getRequestFactory()); } } @@ -1997,7 +2000,7 @@ private final class WriteOperationHelperCallbacksImpl @Retries.OnceRaw public CompleteMultipartUploadResponse completeMultipartUpload( CompleteMultipartUploadRequest request) { - return store.completeMultipartUpload(request); + return getStore().completeMultipartUpload(request); } @Override @@ -2007,7 +2010,7 @@ public UploadPartResponse uploadPart( final RequestBody body, final DurationTrackerFactory durationTrackerFactory) throws AwsServiceException, UncheckedIOException { - return store.uploadPart(request, body, durationTrackerFactory); + return getStore().uploadPart(request, body, durationTrackerFactory); } /** @@ -2804,7 +2807,7 @@ public long getDefaultBlockSize(Path path) { */ @Override public long getObjectSize(S3Object s3Object) throws IOException { - return fsHandler.getS3ObjectSize(s3Object.key(), s3Object.size(), store, null); + return fsHandler.getS3ObjectSize(s3Object.key(), s3Object.size(), getStore(), null); } @Override @@ -3035,7 +3038,7 @@ protected DurationTrackerFactory getDurationTrackerFactory() { */ protected DurationTrackerFactory nonNullDurationTrackerFactory( DurationTrackerFactory factory) { - return store.nonNullDurationTrackerFactory(factory); + return getStore().nonNullDurationTrackerFactory(factory); } /** @@ -3073,7 +3076,7 @@ protected HeadObjectResponse getObjectMetadata(String key, ChangeTracker changeTracker, Invoker changeInvoker, String operation) throws IOException { - return store.headObject(key, changeTracker, changeInvoker, fsHandler, operation); + return getStore().headObject(key, changeTracker, changeInvoker, fsHandler, operation); } /** @@ -3221,7 +3224,7 @@ public void incrementWriteOperations() { protected void deleteObject(String key) throws SdkException, IOException { incrementWriteOperations(); - store.deleteObject(getRequestFactory() + getStore().deleteObject(getRequestFactory() .newDeleteObjectRequestBuilder(key) .build()); } @@ -3275,7 +3278,7 @@ void deleteObjectAtPath(Path f, private DeleteObjectsResponse deleteObjects(DeleteObjectsRequest deleteRequest) throws MultiObjectDeleteException, SdkException, IOException { incrementWriteOperations(); - DeleteObjectsResponse response = store.deleteObjects(deleteRequest).getValue(); + DeleteObjectsResponse response = getStore().deleteObjects(deleteRequest).getValue(); if (!response.errors().isEmpty()) { throw new MultiObjectDeleteException(response.errors()); } @@ -3318,7 +3321,7 @@ public PutObjectRequest.Builder newPutObjectRequestBuilder(String key, @Retries.OnceRaw public UploadInfo putObject(PutObjectRequest putObjectRequest, File file, ProgressableProgressListener listener) throws IOException { - return store.putObject(putObjectRequest, file, listener); + return getStore().putObject(putObjectRequest, file, listener); } /** @@ -3419,7 +3422,7 @@ UploadPartResponse uploadPart(UploadPartRequest request, RequestBody body, * @param bytes bytes in the request. */ protected void incrementPutStartStatistics(long bytes) { - store.incrementPutStartStatistics(bytes); + getStore().incrementPutStartStatistics(bytes); } /** @@ -3430,7 +3433,7 @@ protected void incrementPutStartStatistics(long bytes) { * @param bytes bytes in the request. */ protected void incrementPutCompletedStatistics(boolean success, long bytes) { - store.incrementPutCompletedStatistics(success, bytes); + getStore().incrementPutCompletedStatistics(success, bytes); } /** @@ -3441,7 +3444,7 @@ protected void incrementPutCompletedStatistics(boolean success, long bytes) { * @param bytes bytes successfully uploaded. */ protected void incrementPutProgressStatistics(String key, long bytes) { - store.incrementPutProgressStatistics(key, bytes); + getStore().incrementPutProgressStatistics(key, bytes); } /** @@ -4321,9 +4324,9 @@ PutObjectResponse executePut( String key = putObjectRequest.key(); long len = getPutRequestLength(putObjectRequest); ProgressableProgressListener listener = - new ProgressableProgressListener(store, putObjectRequest.key(), progress); + new ProgressableProgressListener(getStore(), putObjectRequest.key(), progress); UploadInfo info = putObject(putObjectRequest, file, listener); - PutObjectResponse result = store.waitForUploadCompletion(key, info).response(); + PutObjectResponse result = getStore().waitForUploadCompletion(key, info).response(); listener.uploadCompleted(info.getFileUpload()); // post-write actions @@ -4421,7 +4424,7 @@ public void close() throws IOException { protected synchronized void stopAllServices() { try { trackDuration(getDurationTrackerFactory(), FILESYSTEM_CLOSE.getSymbol(), () -> { - closeAutocloseables(LOG, store); + closeAutocloseables(LOG, getStore()); store = null; s3Client = null; @@ -4642,7 +4645,7 @@ private CopyObjectResponse copyFile(String srcKey, String dstKey, long size, () -> { incrementStatistic(OBJECT_COPY_REQUESTS); - Copy copy = store.getOrCreateTransferManager().copy( + Copy copy = getStore().getOrCreateTransferManager().copy( CopyRequest.builder() .copyObjectRequest(copyRequest) .build()); @@ -5857,7 +5860,7 @@ public BulkDelete createBulkDelete(final Path path) */ protected BulkDeleteOperation.BulkDeleteOperationCallbacks createBulkDeleteCallbacks( Path path, int pageSize, AuditSpanS3A span) { - return new BulkDeleteOperationCallbacksImpl(store, pathToKey(path), pageSize, span); + return new BulkDeleteOperationCallbacksImpl(getStore(), pathToKey(path), pageSize, span); } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index c620ca042dc82..c61cc2bd358dc 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -19,7 +19,6 @@ package org.apache.hadoop.fs.s3a; import javax.annotation.Nullable; -import java.io.Closeable; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; @@ -30,7 +29,6 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.IntFunction; @@ -41,6 +39,8 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.impl.LeakReporter; +import org.apache.hadoop.fs.s3a.streams.AbstractS3AInputStream; +import org.apache.hadoop.fs.s3a.streams.FactoryStreamParameters; import org.apache.hadoop.fs.statistics.StreamStatisticNames; import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; @@ -49,7 +49,6 @@ import org.apache.hadoop.fs.CanSetReadahead; import org.apache.hadoop.fs.CanUnbuffer; import org.apache.hadoop.fs.FSExceptionMessages; -import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileRange; import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.impl.CombinedFileRange; @@ -57,17 +56,11 @@ import org.apache.hadoop.fs.s3a.impl.ChangeTracker; import org.apache.hadoop.fs.s3a.impl.InternalConstants; import org.apache.hadoop.fs.s3a.impl.SDKStreamDrainer; -import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; import org.apache.hadoop.fs.statistics.DurationTracker; -import org.apache.hadoop.fs.statistics.IOStatistics; -import org.apache.hadoop.fs.statistics.IOStatisticsAggregator; import org.apache.hadoop.fs.statistics.IOStatisticsSource; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.util.functional.CallableRaisingIOE; -import static java.util.Objects.requireNonNull; -import static org.apache.commons.lang3.StringUtils.isNotEmpty; import static org.apache.hadoop.fs.VectoredReadUtils.isOrderedDisjoint; import static org.apache.hadoop.fs.VectoredReadUtils.mergeSortedRanges; import static org.apache.hadoop.fs.VectoredReadUtils.validateAndSortRanges; @@ -94,7 +87,7 @@ */ @InterfaceAudience.Private @InterfaceStability.Evolving -public class S3AInputStream extends FSInputStream implements CanSetReadahead, +public class S3AInputStream extends AbstractS3AInputStream implements CanSetReadahead, CanUnbuffer, StreamCapabilities, IOStatisticsSource { public static final String E_NEGATIVE_READAHEAD_VALUE @@ -134,6 +127,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, * and returned in {@link #getPos()}. */ private long pos; + /** * Closed bit. Volatile so reads are non-blocking. * Updates must be in a synchronized block to guarantee an atomic check and @@ -144,30 +138,12 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, * Input stream returned by a getObject call. */ private ResponseInputStream wrappedStream; - private final S3AReadOpContext context; - private final InputStreamCallbacks client; - - /** - * Thread pool used for vectored IO operation. - */ - private final ExecutorService boundedThreadPool; - private final String bucket; - private final String key; - private final String pathStr; - - /** - * Content length from HEAD or openFile option. - */ - private final long contentLength; /** * Content length in format for vector IO. */ private final Optional fileLength; - private final String uri; - private final S3AInputStreamStatistics streamStatistics; - private S3AInputPolicy inputPolicy; private long readahead = Constants.DEFAULT_READAHEAD_RANGE; /** Vectored IO context. */ @@ -193,96 +169,31 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, /** change tracker. */ private final ChangeTracker changeTracker; - /** - * IOStatistics report. - */ - private final IOStatistics ioStatistics; - /** * Threshold for stream reads to switch to * asynchronous draining. */ - private long asyncDrainThreshold; - - /** Aggregator used to aggregate per thread IOStatistics. */ - private final IOStatisticsAggregator threadIOStatistics; - - /** - * Report of leaks. - * with report and abort unclosed streams in finalize(). - */ - private final LeakReporter leakReporter; + private final long asyncDrainThreshold; /** * Create the stream. * This does not attempt to open it; that is only done on the first * actual read() operation. - * @param ctx operation context - * @param s3Attributes object attributes - * @param client S3 client to use - * @param streamStatistics stream io stats. - * @param boundedThreadPool thread pool to use. */ - public S3AInputStream(S3AReadOpContext ctx, - S3ObjectAttributes s3Attributes, - InputStreamCallbacks client, - S3AInputStreamStatistics streamStatistics, - ExecutorService boundedThreadPool) { - Preconditions.checkArgument(isNotEmpty(s3Attributes.getBucket()), - "No Bucket"); - Preconditions.checkArgument(isNotEmpty(s3Attributes.getKey()), "No Key"); - long l = s3Attributes.getLen(); - Preconditions.checkArgument(l >= 0, "Negative content length"); - this.context = ctx; - this.bucket = s3Attributes.getBucket(); - this.key = s3Attributes.getKey(); - this.pathStr = s3Attributes.getPath().toString(); - this.contentLength = l; - this.fileLength = Optional.of(contentLength); - this.client = client; - this.uri = "s3a://" + this.bucket + "/" + this.key; - this.streamStatistics = streamStatistics; - this.ioStatistics = streamStatistics.getIOStatistics(); - this.changeTracker = new ChangeTracker(uri, - ctx.getChangeDetectionPolicy(), - streamStatistics.getChangeTrackerStatistics(), - s3Attributes); - setInputPolicy(ctx.getInputPolicy()); - setReadahead(ctx.getReadahead()); - this.asyncDrainThreshold = ctx.getAsyncDrainThreshold(); - this.boundedThreadPool = boundedThreadPool; - this.vectoredIOContext = context.getVectoredIOContext(); - this.threadIOStatistics = requireNonNull(ctx.getIOStatisticsAggregator()); - // build the leak reporter - this.leakReporter = new LeakReporter( - "Stream not closed while reading " + uri, - this::isStreamOpen, - () -> abortInFinalizer()); - } + public S3AInputStream(FactoryStreamParameters parameters) { - /** - * Finalizer. - *

- * Verify that the inner stream is closed. - *

- * If it is not, it means streams are being leaked in application code. - * Log a warning, including the stack trace of the caller, - * then abort the stream. - *

- * This does not attempt to invoke {@link #close()} as that is - * a more complex operation, and this method is being executed - * during a GC finalization phase. - *

- * Applications MUST close their streams; this is a defensive - * operation to return http connections and warn the end users - * that their applications are at risk of running out of connections. - * - * {@inheritDoc} - */ - @Override - protected void finalize() throws Throwable { - leakReporter.close(); - super.finalize(); + super(parameters); + + + this.fileLength = Optional.of(getContentLength()); + S3AReadOpContext context = getContext(); + this.changeTracker = new ChangeTracker(getUri(), + context.getChangeDetectionPolicy(), + getStreamStatistics().getChangeTrackerStatistics(), + getObjectAttributes()); + setReadahead(context.getReadahead()); + this.asyncDrainThreshold = context.getAsyncDrainThreshold(); + this.vectoredIOContext = this.getContext().getVectoredIOContext(); } /** @@ -290,7 +201,8 @@ protected void finalize() throws Throwable { * Not synchronized; the flag is volatile. * @return true if the stream is still open. */ - private boolean isStreamOpen() { + @Override + protected boolean isStreamOpen() { return !closed; } @@ -298,10 +210,11 @@ private boolean isStreamOpen() { * Brute force stream close; invoked by {@link LeakReporter}. * All exceptions raised are ignored. */ - private void abortInFinalizer() { + @Override + protected void abortInFinalizer() { try { // stream was leaked: update statistic - streamStatistics.streamLeaked(); + getS3AStreamStatistics().streamLeaked(); // abort the stream. This merges statistics into the filesystem. closeStream("finalize()", true, true).get(); } catch (InterruptedException | ExecutionException ignroed) { @@ -309,32 +222,12 @@ private void abortInFinalizer() { } } - /** - * Set/update the input policy of the stream. - * This updates the stream statistics. - * @param inputPolicy new input policy. - */ - private void setInputPolicy(S3AInputPolicy inputPolicy) { - LOG.debug("Switching to input policy {}", inputPolicy); - this.inputPolicy = inputPolicy; - streamStatistics.inputPolicySet(inputPolicy.ordinal()); - } - - /** - * Get the current input policy. - * @return input policy. - */ - @VisibleForTesting - public S3AInputPolicy getInputPolicy() { - return inputPolicy; - } - /** * If the stream is in Adaptive mode, switch to random IO at this * point. Unsynchronized. */ private void maybeSwitchToRandomIO() { - if (inputPolicy.isAdaptive()) { + if (getInputPolicy().isAdaptive()) { setInputPolicy(S3AInputPolicy.Random); } } @@ -355,24 +248,24 @@ private synchronized void reopen(String reason, long targetPos, long length, closeStream("reopen(" + reason + ")", forceAbort, false); } - contentRangeFinish = calculateRequestLimit(inputPolicy, targetPos, - length, contentLength, readahead); + contentRangeFinish = calculateRequestLimit(getInputPolicy(), targetPos, + length, getContentLength(), readahead); LOG.debug("reopen({}) for {} range[{}-{}], length={}," + " streamPosition={}, nextReadPosition={}, policy={}", - uri, reason, targetPos, contentRangeFinish, length, pos, nextReadPos, - inputPolicy); + getUri(), reason, targetPos, contentRangeFinish, length, pos, nextReadPos, + getInputPolicy()); - GetObjectRequest request = client.newGetRequestBuilder(key) + GetObjectRequest request = getCallbacks().newGetRequestBuilder(getKey()) .range(S3AUtils.formatRange(targetPos, contentRangeFinish - 1)) .applyMutation(changeTracker::maybeApplyConstraint) .build(); - long opencount = streamStatistics.streamOpened(); + long opencount = getS3AStreamStatistics().streamOpened(); String operation = opencount == 0 ? OPERATION_OPEN : OPERATION_REOPEN; String text = String.format("%s %s at %d", - operation, uri, targetPos); - wrappedStream = onceTrackingDuration(text, uri, - streamStatistics.initiateGetRequest(), () -> - client.getObject(request)); + operation, getUri(), targetPos); + wrappedStream = onceTrackingDuration(text, getUri(), + getS3AStreamStatistics().initiateGetRequest(), () -> + getCallbacks().getObject(request)); changeTracker.processResponse(wrappedStream.response(), operation, targetPos); @@ -396,7 +289,7 @@ public synchronized void seek(long targetPos) throws IOException { + " " + targetPos); } - if (this.contentLength <= 0) { + if (this.getContentLength() <= 0) { return; } @@ -414,7 +307,7 @@ private void seekQuietly(long positiveTargetPos) { seek(positiveTargetPos); } catch (IOException ioe) { LOG.debug("Ignoring IOE on seek of {} to {}", - uri, positiveTargetPos, ioe); + getUri(), positiveTargetPos, ioe); } } @@ -449,12 +342,12 @@ private void seekInStream(long targetPos, long length) throws IOException { && diff < forwardSeekLimit; if (skipForward) { // the forward seek range is within the limits - LOG.debug("Forward seek on {}, of {} bytes", uri, diff); + LOG.debug("Forward seek on {}, of {} bytes", getUri(), diff); long skipped = wrappedStream.skip(diff); if (skipped > 0) { pos += skipped; } - streamStatistics.seekForwards(diff, skipped); + getS3AStreamStatistics().seekForwards(diff, skipped); if (pos == targetPos) { // all is well @@ -464,15 +357,15 @@ private void seekInStream(long targetPos, long length) throws IOException { } else { // log a warning; continue to attempt to re-open LOG.warn("Failed to seek on {} to {}. Current position {}", - uri, targetPos, pos); + getUri(), targetPos, pos); } } else { // not attempting to read any bytes from the stream - streamStatistics.seekForwards(diff, 0); + getS3AStreamStatistics().seekForwards(diff, 0); } } else if (diff < 0) { // backwards seek - streamStatistics.seekBackwards(diff); + getS3AStreamStatistics().seekBackwards(diff); // if the stream is in "Normal" mode, switch to random IO at this // point, as it is indicative of columnar format IO maybeSwitchToRandomIO(); @@ -513,8 +406,8 @@ public boolean seekToNewSource(long targetPos) throws IOException { @Retries.RetryTranslated private void lazySeek(long targetPos, long len) throws IOException { - Invoker invoker = context.getReadInvoker(); - invoker.retry("lazySeek to " + targetPos, pathStr, true, + Invoker invoker = getContext().getReadInvoker(); + invoker.retry("lazySeek to " + targetPos, getPathStr(), true, () -> { //For lazy seek seekInStream(targetPos, len); @@ -532,9 +425,9 @@ private void lazySeek(long targetPos, long len) throws IOException { * @param bytesRead number of bytes read */ private void incrementBytesRead(long bytesRead) { - streamStatistics.bytesRead(bytesRead); - if (context.stats != null && bytesRead > 0) { - context.stats.incrementBytesRead(bytesRead); + getS3AStreamStatistics().bytesRead(bytesRead); + if (getContext().stats != null && bytesRead > 0) { + getContext().stats.incrementBytesRead(bytesRead); } } @@ -542,7 +435,7 @@ private void incrementBytesRead(long bytesRead) { @Retries.RetryTranslated public synchronized int read() throws IOException { checkNotClosed(); - if (this.contentLength == 0 || (nextReadPos >= contentLength)) { + if (this.getContentLength() == 0 || (nextReadPos >= getContentLength())) { return -1; } @@ -554,8 +447,8 @@ public synchronized int read() throws IOException { return -1; } - Invoker invoker = context.getReadInvoker(); - int byteRead = invoker.retry("read", pathStr, true, + Invoker invoker = getContext().getReadInvoker(); + int byteRead = invoker.retry("read", getPathStr(), true, () -> { int b; // When exception happens before re-setting wrappedStream in "reopen" called @@ -597,13 +490,13 @@ private void onReadFailure(IOException ioe, boolean forceAbort) { if (LOG.isDebugEnabled()) { LOG.debug("Got exception while trying to read from stream {}, " + "client: {} object: {}, trying to recover: ", - uri, client, objectResponse, ioe); + getUri(), getCallbacks(), objectResponse, ioe); } else { LOG.info("Got exception while trying to read from stream {}, " + "client: {} object: {}, trying to recover: " + ioe, - uri, client, objectResponse); + getUri(), getCallbacks(), objectResponse); } - streamStatistics.readException(); + getS3AStreamStatistics().readException(); closeStream("failure recovery", forceAbort, false); } @@ -638,7 +531,7 @@ public synchronized int read(byte[] buf, int off, int len) return 0; } - if (this.contentLength == 0 || (nextReadPos >= contentLength)) { + if (this.getContentLength() == 0 || (nextReadPos >= getContentLength())) { return -1; } @@ -649,10 +542,10 @@ public synchronized int read(byte[] buf, int off, int len) return -1; } - Invoker invoker = context.getReadInvoker(); + Invoker invoker = getContext().getReadInvoker(); - streamStatistics.readOperationStarted(nextReadPos, len); - int bytesRead = invoker.retry("read", pathStr, true, + getS3AStreamStatistics().readOperationStarted(nextReadPos, len); + int bytesRead = invoker.retry("read", getPathStr(), true, () -> { int bytes; // When exception happens before re-setting wrappedStream in "reopen" called @@ -685,7 +578,7 @@ public synchronized int read(byte[] buf, int off, int len) } else { streamReadResultNegative(); } - streamStatistics.readOperationCompleted(len, bytesRead); + getS3AStreamStatistics().readOperationCompleted(len, bytesRead); return bytesRead; } @@ -696,7 +589,7 @@ public synchronized int read(byte[] buf, int off, int len) */ private void checkNotClosed() throws IOException { if (closed) { - throw new IOException(uri + ": " + FSExceptionMessages.STREAM_IS_CLOSED); + throw new IOException(getUri() + ": " + FSExceptionMessages.STREAM_IS_CLOSED); } } @@ -717,28 +610,14 @@ public synchronized void close() throws IOException { // close or abort the stream; blocking closeStream("close() operation", false, true); // end the client+audit span. - client.close(); - // this is actually a no-op - super.close(); + getCallbacks().close(); + } finally { - // merge the statistics back into the FS statistics. - streamStatistics.close(); - // Collect ThreadLevel IOStats - mergeThreadIOStatistics(streamStatistics.getIOStatistics()); + super.close(); } } } - /** - * Merging the current thread's IOStatistics with the current IOStatistics - * context. - * - * @param streamIOStats Stream statistics to be merged into thread - * statistics aggregator. - */ - private void mergeThreadIOStatistics(IOStatistics streamIOStats) { - threadIOStatistics.aggregate(streamIOStats); - } /** * Close a stream: decide whether to abort or close, based on @@ -776,11 +655,11 @@ private CompletableFuture closeStream( boolean shouldAbort = forceAbort || remaining > readahead; CompletableFuture operation; SDKStreamDrainer> drainer = new SDKStreamDrainer<>( - uri, + getUri(), wrappedStream, shouldAbort, (int) remaining, - streamStatistics, + getS3AStreamStatistics(), reason); if (blocking || shouldAbort || remaining <= asyncDrainThreshold) { @@ -792,7 +671,7 @@ private CompletableFuture closeStream( } else { LOG.debug("initiating asynchronous drain of {} bytes", remaining); // schedule an async drain/abort - operation = client.submit(drainer); + operation = getCallbacks().submit(drainer); } // either the stream is closed in the blocking call or the async call is @@ -817,7 +696,7 @@ private CompletableFuture closeStream( @InterfaceStability.Unstable public synchronized boolean resetConnection() throws IOException { checkNotClosed(); - LOG.info("Forcing reset of connection to {}", uri); + LOG.info("Forcing reset of connection to {}", getUri()); return awaitFuture(closeStream("reset()", true, true)); } @@ -839,7 +718,7 @@ public synchronized int available() throws IOException { @InterfaceAudience.Private @InterfaceStability.Unstable public synchronized long remainingInFile() { - return this.contentLength - this.pos; + return this.getContentLength() - this.pos; } /** @@ -879,17 +758,17 @@ public boolean markSupported() { @Override @InterfaceStability.Unstable public String toString() { - String s = streamStatistics.toString(); + String s = getS3AStreamStatistics().toString(); synchronized (this) { final StringBuilder sb = new StringBuilder( "S3AInputStream{"); - sb.append(uri); + sb.append(getUri()); sb.append(" wrappedStream=") .append(isObjectStreamOpen() ? "open" : "closed"); - sb.append(" read policy=").append(inputPolicy); + sb.append(" read policy=").append(getInputPolicy()); sb.append(" pos=").append(pos); sb.append(" nextReadPos=").append(nextReadPos); - sb.append(" contentLength=").append(contentLength); + sb.append(" contentLength=").append(getContentLength()); sb.append(" contentRangeStart=").append(contentRangeStart); sb.append(" contentRangeFinish=").append(contentRangeFinish); sb.append(" remainingInCurrentRequest=") @@ -920,7 +799,7 @@ public void readFully(long position, byte[] buffer, int offset, int length) throws IOException { checkNotClosed(); validatePositionedReadArgs(position, buffer, offset, length); - streamStatistics.readFullyOperationStarted(position, length); + getS3AStreamStatistics().readFullyOperationStarted(position, length); if (length == 0) { return; } @@ -971,10 +850,10 @@ public int maxReadSizeForVectorReads() { @Override public synchronized void readVectored(List ranges, IntFunction allocate) throws IOException { - LOG.debug("Starting vectored read on path {} for ranges {} ", pathStr, ranges); + LOG.debug("Starting vectored read on path {} for ranges {} ", getPathStr(), ranges); checkNotClosed(); if (stopVectoredIOOperations.getAndSet(false)) { - LOG.debug("Reinstating vectored read operation for path {} ", pathStr); + LOG.debug("Reinstating vectored read operation for path {} ", getPathStr()); } // prepare to read @@ -992,26 +871,26 @@ public synchronized void readVectored(List ranges, if (isOrderedDisjoint(sortedRanges, 1, minSeekForVectorReads())) { LOG.debug("Not merging the ranges as they are disjoint"); - streamStatistics.readVectoredOperationStarted(sortedRanges.size(), sortedRanges.size()); + getS3AStreamStatistics().readVectoredOperationStarted(sortedRanges.size(), sortedRanges.size()); for (FileRange range: sortedRanges) { ByteBuffer buffer = allocate.apply(range.getLength()); - boundedThreadPool.submit(() -> readSingleRange(range, buffer)); + getBoundedThreadPool().submit(() -> readSingleRange(range, buffer)); } } else { LOG.debug("Trying to merge the ranges as they are not disjoint"); List combinedFileRanges = mergeSortedRanges(sortedRanges, 1, minSeekForVectorReads(), maxReadSizeForVectorReads()); - streamStatistics.readVectoredOperationStarted(sortedRanges.size(), combinedFileRanges.size()); + getS3AStreamStatistics().readVectoredOperationStarted(sortedRanges.size(), combinedFileRanges.size()); LOG.debug("Number of original ranges size {} , Number of combined ranges {} ", ranges.size(), combinedFileRanges.size()); for (CombinedFileRange combinedFileRange: combinedFileRanges) { - boundedThreadPool.submit( + getBoundedThreadPool().submit( () -> readCombinedRangeAndUpdateChildren(combinedFileRange, allocate)); } } LOG.debug("Finished submitting vectored read to threadpool" + - " on path {} for ranges {} ", pathStr, ranges); + " on path {} for ranges {} ", getPathStr(), ranges); } /** @@ -1022,7 +901,7 @@ public synchronized void readVectored(List ranges, */ private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRange, IntFunction allocate) { - LOG.debug("Start reading {} from path {} ", combinedFileRange, pathStr); + LOG.debug("Start reading {} from path {} ", combinedFileRange, getPathStr()); ResponseInputStream rangeContent = null; try { rangeContent = getS3ObjectInputStream("readCombinedFileRange", @@ -1030,7 +909,7 @@ private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRa combinedFileRange.getLength()); populateChildBuffers(combinedFileRange, rangeContent, allocate); } catch (Exception ex) { - LOG.debug("Exception while reading {} from path {} ", combinedFileRange, pathStr, ex); + LOG.debug("Exception while reading {} from path {} ", combinedFileRange, getPathStr(), ex); // complete exception all the underlying ranges which have not already // finished. for(FileRange child : combinedFileRange.getUnderlying()) { @@ -1041,7 +920,7 @@ private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRa } finally { IOUtils.cleanupWithLogger(LOG, rangeContent); } - LOG.debug("Finished reading {} from path {} ", combinedFileRange, pathStr); + LOG.debug("Finished reading {} from path {} ", combinedFileRange, getPathStr()); } /** @@ -1129,7 +1008,7 @@ private void drainUnnecessaryData( remaining -= readCount; } } finally { - streamStatistics.readVectoredBytesDiscarded(drainBytes); + getS3AStreamStatistics().readVectoredBytesDiscarded(drainBytes); LOG.debug("{} bytes drained from stream ", drainBytes); } } @@ -1140,7 +1019,7 @@ private void drainUnnecessaryData( * @param buffer buffer to fill. */ private void readSingleRange(FileRange range, ByteBuffer buffer) { - LOG.debug("Start reading {} from {} ", range, pathStr); + LOG.debug("Start reading {} from {} ", range, getPathStr()); if (range.getLength() == 0) { // a zero byte read. buffer.flip(); @@ -1155,12 +1034,12 @@ private void readSingleRange(FileRange range, ByteBuffer buffer) { populateBuffer(range, buffer, objectRange); range.getData().complete(buffer); } catch (Exception ex) { - LOG.warn("Exception while reading a range {} from path {} ", range, pathStr, ex); + LOG.warn("Exception while reading a range {} from path {} ", range, getPathStr(), ex); range.getData().completeExceptionally(ex); } finally { IOUtils.cleanupWithLogger(LOG, objectRange); } - LOG.debug("Finished reading range {} from path {} ", range, pathStr); + LOG.debug("Finished reading range {} from path {} ", range, getPathStr()); } /** @@ -1274,18 +1153,18 @@ private ResponseInputStream getS3Object(String operationName, long position, int length) throws IOException { - final GetObjectRequest request = client.newGetRequestBuilder(key) + final GetObjectRequest request = getCallbacks().newGetRequestBuilder(getKey()) .range(S3AUtils.formatRange(position, position + length - 1)) .applyMutation(changeTracker::maybeApplyConstraint) .build(); - DurationTracker tracker = streamStatistics.initiateGetRequest(); + DurationTracker tracker = getS3AStreamStatistics().initiateGetRequest(); ResponseInputStream objectRange; - Invoker invoker = context.getReadInvoker(); + Invoker invoker = getContext().getReadInvoker(); try { - objectRange = invoker.retry(operationName, pathStr, true, + objectRange = invoker.retry(operationName, getPathStr(), true, () -> { checkIfVectoredIOStopped(); - return client.getObject(request); + return getCallbacks().getObject(request); }); } catch (IOException ex) { @@ -1312,18 +1191,6 @@ private void checkIfVectoredIOStopped() throws InterruptedIOException { } } - /** - * Access the input stream statistics. - * This is for internal testing and may be removed without warning. - * @return the statistics for this input stream - */ - @InterfaceAudience.Private - @InterfaceStability.Unstable - @VisibleForTesting - public S3AInputStreamStatistics getS3AStreamStatistics() { - return streamStatistics; - } - @Override public synchronized void setReadahead(Long readahead) { this.readahead = validateReadahead(readahead); @@ -1409,8 +1276,8 @@ public synchronized void unbuffer() { stopVectoredIOOperations.set(true); closeStream("unbuffer()", false, false); } finally { - streamStatistics.unbuffered(); - if (inputPolicy.isAdaptive()) { + getS3AStreamStatistics().unbuffered(); + if (getInputPolicy().isAdaptive()) { S3AInputPolicy policy = S3AInputPolicy.Random; setInputPolicy(policy); } @@ -1423,12 +1290,9 @@ public boolean hasCapability(String capability) { case StreamCapabilities.IOSTATISTICS: case StreamCapabilities.IOSTATISTICS_CONTEXT: case StreamStatisticNames.STREAM_LEAKS: - case StreamCapabilities.READAHEAD: - case StreamCapabilities.UNBUFFER: - case StreamCapabilities.VECTOREDIO: return true; default: - return false; + return super.hasCapability(capability); } } @@ -1441,11 +1305,6 @@ public boolean isObjectStreamOpen() { return wrappedStream != null; } - @Override - public IOStatistics getIOStatistics() { - return ioStatistics; - } - /** * Get the wrapped stream. * This is for testing only. @@ -1457,38 +1316,4 @@ public ResponseInputStream getWrappedStream() { return wrappedStream; } - /** - * Callbacks for input stream IO. - */ - public interface InputStreamCallbacks extends Closeable { - - /** - * Create a GET request builder. - * @param key object key - * @return the request builder - */ - GetObjectRequest.Builder newGetRequestBuilder(String key); - - /** - * Execute the request. - * When CSE is enabled with reading of unencrypted data, The object is checked if it is - * encrypted and if so, the request is made with encrypted S3 client. If the object is - * not encrypted, the request is made with unencrypted s3 client. - * @param request the request - * @return the response - * @throws IOException on any failure. - */ - @Retries.OnceRaw - ResponseInputStream getObject(GetObjectRequest request) throws IOException; - - /** - * Submit some asynchronous work, for example, draining a stream. - * @param operation operation to invoke - * @param return type - * @return a future. - */ - CompletableFuture submit(CallableRaisingIOE operation); - - } - } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java index 55351f0c81396..67fc38d2c1ef8 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java @@ -34,7 +34,7 @@ import static java.util.Objects.requireNonNull; /** - * Read-specific operation context struct. + * Read-specific operation context structure. */ public class S3AReadOpContext extends S3AOpContext { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java index ab8785e01dafd..c1cdc172600d6 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java @@ -45,6 +45,8 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.s3a.api.RequestFactory; import org.apache.hadoop.fs.s3a.impl.ChangeTracker; import org.apache.hadoop.fs.s3a.impl.ClientManager; @@ -54,6 +56,7 @@ import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext; import org.apache.hadoop.fs.statistics.DurationTrackerFactory; import org.apache.hadoop.fs.statistics.IOStatisticsSource; +import org.apache.hadoop.service.Service; /** * Interface for the S3A Store; @@ -63,10 +66,14 @@ * The {@link ClientManager} interface is used to create the AWS clients; * the base implementation forwards to the implementation of this interface * passed in at construction time. + *

+ * The interface extends the Hadoop {@link Service} interface + * and follows its lifecycle: it MUST NOT be used until + * {@link Service#init(Configuration)} has been invoked. */ @InterfaceAudience.LimitedPrivate("Extensions") @InterfaceStability.Unstable -public interface S3AStore extends IOStatisticsSource, ClientManager { +public interface S3AStore extends Service, IOStatisticsSource, ClientManager { /** * Acquire write capacity for operations. @@ -302,4 +309,26 @@ CompletedFileUpload waitForUploadCompletion(String key, UploadInfo uploadInfo) @Retries.OnceRaw CompleteMultipartUploadResponse completeMultipartUpload( CompleteMultipartUploadRequest request); + + /** + * Get the directory allocator. + * @return the directory allocator + */ + LocalDirAllocator getDirectoryAllocator(); + + /** + * Demand create the directory allocator, then create a temporary file. + * This does not mark the file for deletion when a process exits. + * Pass in a file size of {@link LocalDirAllocator#SIZE_UNKNOWN} if the + * size is unknown. + * {@link LocalDirAllocator#createTmpFileForWrite(String, long, Configuration)}. + * @param pathStr prefix for the temporary file + * @param size the size of the file that is going to be written + * @param conf the Configuration object + * @return a unique temporary file + * @throws IOException IO problems + */ + File createTemporaryFileForWriting(String pathStr, + long size, + Configuration conf) throws IOException; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManager.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManager.java index ad7afc732387f..d29fd14cbdf46 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManager.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManager.java @@ -76,8 +76,4 @@ S3TransferManager getOrCreateTransferManager() */ S3Client getOrCreateAsyncS3ClientUnchecked() throws UncheckedIOException; - /** - * Close operation is required to not raise exceptions. - */ - void close(); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java index db07881345500..f0f8235934570 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java @@ -57,7 +57,10 @@ import software.amazon.awssdk.transfer.s3.model.FileUpload; import software.amazon.awssdk.transfer.s3.model.UploadFileRequest; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.Invoker; import org.apache.hadoop.fs.s3a.ProgressableProgressListener; import org.apache.hadoop.fs.s3a.Retries; @@ -74,11 +77,14 @@ import org.apache.hadoop.fs.statistics.DurationTrackerFactory; import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.fs.store.audit.AuditSpanSource; +import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.DurationInfo; import org.apache.hadoop.util.RateLimiting; import org.apache.hadoop.util.functional.Tuples; import static java.util.Objects.requireNonNull; +import static org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR; +import static org.apache.hadoop.fs.s3a.Constants.HADOOP_TMP_DIR; import static org.apache.hadoop.fs.s3a.S3AUtils.extractException; import static org.apache.hadoop.fs.s3a.S3AUtils.getPutRequestLength; import static org.apache.hadoop.fs.s3a.S3AUtils.isThrottleException; @@ -109,7 +115,8 @@ * This is where lower level storage operations are intended * to move. */ -public class S3AStoreImpl implements S3AStore { +public class S3AStoreImpl extends CompositeService + implements S3AStore { private static final Logger LOG = LoggerFactory.getLogger(S3AStoreImpl.class); @@ -165,6 +172,11 @@ public class S3AStoreImpl implements S3AStore { */ private final FileSystem.Statistics fsStatistics; + /** + * Allocator of local FS storage. + */ + private LocalDirAllocator directoryAllocator; + /** Constructor to create S3A store. */ S3AStoreImpl(StoreContextFactory storeContextFactory, ClientManager clientManager, @@ -176,6 +188,7 @@ public class S3AStoreImpl implements S3AStore { RateLimiting writeRateLimiter, AuditSpanSource auditSpanSource, @Nullable FileSystem.Statistics fsStatistics) { + super("S3AStore"); this.storeContextFactory = requireNonNull(storeContextFactory); this.clientManager = requireNonNull(clientManager); this.durationTrackerFactory = requireNonNull(durationTrackerFactory); @@ -193,10 +206,26 @@ public class S3AStoreImpl implements S3AStore { } @Override - public void close() { + protected void serviceStart() throws Exception { + super.serviceStart(); + initLocalDirAllocator(); + } + + @Override + protected void serviceStop() throws Exception { + super.serviceStop(); clientManager.close(); } + /** + * Initialize dir allocator if not already initialized. + */ + private void initLocalDirAllocator() { + String bufferDir = getConfig().get(BUFFER_DIR) != null + ? BUFFER_DIR : HADOOP_TMP_DIR; + directoryAllocator = new LocalDirAllocator(bufferDir); + } + /** Acquire write capacity for rate limiting {@inheritDoc}. */ @Override public Duration acquireWriteCapacity(final int capacity) { @@ -808,4 +837,39 @@ public CompleteMultipartUploadResponse completeMultipartUpload( return getS3Client().completeMultipartUpload(request); } + + /** + * Get the directory allocator. + * @return the directory allocator + */ + @Override + public LocalDirAllocator getDirectoryAllocator() { + return directoryAllocator; + } + + /** + * Demand create the directory allocator, then create a temporary file. + * This does not mark the file for deletion when a process exits. + * Pass in a file size of {@link LocalDirAllocator#SIZE_UNKNOWN} if the + * size is unknown. + * {@link LocalDirAllocator#createTmpFileForWrite(String, long, Configuration)}. + * @param pathStr prefix for the temporary file + * @param size the size of the file that is going to be written + * @param conf the Configuration object + * @return a unique temporary file + * @throws IOException IO problems + */ + @Override + public File createTemporaryFileForWriting(String pathStr, + long size, + Configuration conf) throws IOException { + requireNonNull(directoryAllocator, "directory allocator not initialized"); + Path path = directoryAllocator.getLocalPathForWrite(pathStr, + size, conf); + File dir = new File(path.getParent().toUri().getPath()); + String prefix = path.getName(); + // create a temp file on this directory + return File.createTempFile(prefix, null, dir); + } + } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingInputStream.java index e05ad7e38b5b8..3a0fd6d247fa8 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingInputStream.java @@ -32,10 +32,10 @@ import org.apache.hadoop.fs.impl.prefetch.BlockManagerParameters; import org.apache.hadoop.fs.impl.prefetch.BufferData; import org.apache.hadoop.fs.impl.prefetch.FilePosition; -import org.apache.hadoop.fs.s3a.S3AInputStream; import org.apache.hadoop.fs.s3a.S3AReadOpContext; import org.apache.hadoop.fs.s3a.S3ObjectAttributes; import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; +import org.apache.hadoop.fs.s3a.streams.StreamReadCallbacks; import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PREFETCH_MAX_BLOCKS_COUNT; import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_MAX_BLOCKS_COUNT; @@ -75,7 +75,7 @@ public class S3ACachingInputStream extends S3ARemoteInputStream { public S3ACachingInputStream( S3AReadOpContext context, S3ObjectAttributes s3Attributes, - S3AInputStream.InputStreamCallbacks client, + StreamReadCallbacks client, S3AInputStreamStatistics streamStatistics, Configuration conf, LocalDirAllocator localDirAllocator) { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3AInMemoryInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3AInMemoryInputStream.java index e8bfe946f4abf..a64f48dfaa2ec 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3AInMemoryInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3AInMemoryInputStream.java @@ -27,10 +27,10 @@ import org.apache.hadoop.fs.impl.prefetch.BufferData; import org.apache.hadoop.fs.impl.prefetch.FilePosition; -import org.apache.hadoop.fs.s3a.S3AInputStream; import org.apache.hadoop.fs.s3a.S3AReadOpContext; import org.apache.hadoop.fs.s3a.S3ObjectAttributes; import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; +import org.apache.hadoop.fs.s3a.streams.StreamReadCallbacks; /** * Provides an {@code InputStream} that allows reading from an S3 file. @@ -61,7 +61,7 @@ public class S3AInMemoryInputStream extends S3ARemoteInputStream { public S3AInMemoryInputStream( S3AReadOpContext context, S3ObjectAttributes s3Attributes, - S3AInputStream.InputStreamCallbacks client, + StreamReadCallbacks client, S3AInputStreamStatistics streamStatistics) { super(context, s3Attributes, client, streamStatistics); int fileSize = (int) s3Attributes.getLen(); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchingInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchingInputStream.java index 9b9ee12ad7502..f59b5b912ac2f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchingInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchingInputStream.java @@ -34,10 +34,10 @@ import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.impl.prefetch.Validate; -import org.apache.hadoop.fs.s3a.S3AInputStream; import org.apache.hadoop.fs.s3a.S3AReadOpContext; import org.apache.hadoop.fs.s3a.S3ObjectAttributes; import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; +import org.apache.hadoop.fs.s3a.streams.StreamReadCallbacks; import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.fs.statistics.IOStatisticsSource; @@ -90,7 +90,7 @@ public class S3APrefetchingInputStream public S3APrefetchingInputStream( S3AReadOpContext context, S3ObjectAttributes s3Attributes, - S3AInputStream.InputStreamCallbacks client, + StreamReadCallbacks client, S3AInputStreamStatistics streamStatistics, Configuration conf, LocalDirAllocator localDirAllocator) { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteInputStream.java index 38d740bd74f94..3f609b1b9532b 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteInputStream.java @@ -36,11 +36,11 @@ import org.apache.hadoop.fs.impl.prefetch.FilePosition; import org.apache.hadoop.fs.impl.prefetch.Validate; import org.apache.hadoop.fs.s3a.S3AInputPolicy; -import org.apache.hadoop.fs.s3a.S3AInputStream; import org.apache.hadoop.fs.s3a.S3AReadOpContext; import org.apache.hadoop.fs.s3a.S3ObjectAttributes; import org.apache.hadoop.fs.s3a.impl.ChangeTracker; import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; +import org.apache.hadoop.fs.s3a.streams.StreamReadCallbacks; import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.fs.statistics.IOStatisticsSource; @@ -98,7 +98,7 @@ public abstract class S3ARemoteInputStream private S3ObjectAttributes s3Attributes; /** Callbacks used for interacting with the underlying S3 client. */ - private S3AInputStream.InputStreamCallbacks client; + private StreamReadCallbacks client; /** Used for reporting input stream access statistics. */ private final S3AInputStreamStatistics streamStatistics; @@ -124,7 +124,7 @@ public abstract class S3ARemoteInputStream public S3ARemoteInputStream( S3AReadOpContext context, S3ObjectAttributes s3Attributes, - S3AInputStream.InputStreamCallbacks client, + StreamReadCallbacks client, S3AInputStreamStatistics streamStatistics) { this.context = requireNonNull(context); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteObject.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteObject.java index ec6e3700226e0..446db4f88ac64 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteObject.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteObject.java @@ -36,6 +36,7 @@ import org.apache.hadoop.fs.s3a.impl.ChangeTracker; import org.apache.hadoop.fs.s3a.impl.SDKStreamDrainer; import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; +import org.apache.hadoop.fs.s3a.streams.StreamReadCallbacks; import org.apache.hadoop.fs.statistics.DurationTracker; @@ -60,7 +61,7 @@ public class S3ARemoteObject { /** * Callbacks used for interacting with the underlying S3 client. */ - private final S3AInputStream.InputStreamCallbacks client; + private final StreamReadCallbacks client; /** * Used for reporting input stream access statistics. @@ -100,7 +101,7 @@ public class S3ARemoteObject { public S3ARemoteObject( S3AReadOpContext context, S3ObjectAttributes s3Attributes, - S3AInputStream.InputStreamCallbacks client, + StreamReadCallbacks client, S3AInputStreamStatistics streamStatistics, ChangeTracker changeTracker) { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/streams/AbstractS3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/streams/AbstractS3AInputStream.java new file mode 100644 index 0000000000000..645df6a113ef4 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/streams/AbstractS3AInputStream.java @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.streams; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.fs.StreamCapabilities; +import org.apache.hadoop.fs.impl.LeakReporter; +import org.apache.hadoop.fs.s3a.S3AInputPolicy; +import org.apache.hadoop.fs.s3a.S3AReadOpContext; +import org.apache.hadoop.fs.s3a.S3ObjectAttributes; +import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.IOStatisticsAggregator; +import org.apache.hadoop.fs.statistics.IOStatisticsSource; +import org.apache.hadoop.fs.statistics.StreamStatisticNames; +import org.apache.hadoop.util.Preconditions; + +import static java.util.Objects.requireNonNull; +import static org.apache.commons.lang3.StringUtils.isNotEmpty; +import static org.apache.hadoop.util.StringUtils.toLowerCase; + +/** + * Base class for input streams returned by the factory, and therefore + * used within S3A code. + */ +public abstract class AbstractS3AInputStream extends FSInputStream + implements StreamCapabilities, IOStatisticsSource { + + private static final Logger LOG = + LoggerFactory.getLogger(AbstractS3AInputStream.class); + + /** + * IOStatistics report. + */ + protected final IOStatistics ioStatistics; + + /** + * Read-specific operation context structure. + */ + private final S3AReadOpContext context; + + /** + * Callbacks for reading input stream data from the S3 Store. + */ + private final StreamReadCallbacks callbacks; + + /** + * Thread pool used for vectored IO operation. + */ + private final ExecutorService boundedThreadPool; + + /** + * URI of path. + */ + private final String uri; + + /** + * Store bucket. + */ + private final String bucket; + + /** + * Store key. + */ + private final String key; + + /** + * Path URI as a string. + */ + private final String pathStr; + + /** + * Content length from HEAD or openFile option. + */ + private final long contentLength; + + private final S3ObjectAttributes objectAttributes; + + /** + * Stream statistics. + */ + private final S3AInputStreamStatistics streamStatistics; + + /** Aggregator used to aggregate per thread IOStatistics. */ + private final IOStatisticsAggregator threadIOStatistics; + + /** + * Report of leaks. + * with report and abort unclosed streams in finalize(). + */ + private final LeakReporter leakReporter; + + /** + * Requested input policy. + */ + private S3AInputPolicy inputPolicy; + + /** + * Constructor. + * @param parameters extensible parameter list. + */ + protected AbstractS3AInputStream( + FactoryStreamParameters parameters) { + + objectAttributes = parameters.getObjectAttributes(); + Preconditions.checkArgument(isNotEmpty(objectAttributes.getBucket()), + "No Bucket"); + Preconditions.checkArgument(isNotEmpty(objectAttributes.getKey()), "No Key"); + long l = objectAttributes.getLen(); + Preconditions.checkArgument(l >= 0, "Negative content length"); + this.context = parameters.getContext(); + this.contentLength = l; + + this.bucket = objectAttributes.getBucket(); + this.key = objectAttributes.getKey(); + this.pathStr = objectAttributes.getPath().toString(); + this.callbacks = parameters.getCallbacks(); + this.uri = "s3a://" + bucket + "/" + key; + this.streamStatistics = parameters.getStreamStatistics(); + this.ioStatistics = streamStatistics.getIOStatistics(); + this.inputPolicy = context.getInputPolicy(); + streamStatistics.inputPolicySet(inputPolicy.ordinal()); + this.boundedThreadPool = parameters.getBoundedThreadPool(); + this.threadIOStatistics = requireNonNull(context.getIOStatisticsAggregator()); + // build the leak reporter + this.leakReporter = new LeakReporter( + "Stream not closed while reading " + uri, + this::isStreamOpen, + () -> abortInFinalizer()); + } + + /** + * Probe for stream being open. + * Not synchronized; the flag is volatile. + * @return true if the stream is still open. + */ + protected abstract boolean isStreamOpen(); + + /** + * Brute force stream close; invoked by {@link LeakReporter}. + * All exceptions raised are ignored. + */ + protected abstract void abortInFinalizer(); + + /** + * Close the stream. + * This triggers publishing of the stream statistics back to the filesystem + * statistics. + * This operation is synchronized, so that only one thread can attempt to + * @throws IOException on any problem + */ + @Override + public synchronized void close() throws IOException { + // end the client+audit span. + callbacks.close(); + // merge the statistics back into the FS statistics. + streamStatistics.close(); + // Collect ThreadLevel IOStats + mergeThreadIOStatistics(streamStatistics.getIOStatistics()); + } + + /** + * Merging the current thread's IOStatistics with the current IOStatistics + * context. + * @param streamIOStats Stream statistics to be merged into thread + * statistics aggregator. + */ + protected void mergeThreadIOStatistics(IOStatistics streamIOStats) { + threadIOStatistics.aggregate(streamIOStats); + } + + /** + * Finalizer. + *

+ * Verify that the inner stream is closed. + *

+ * If it is not, it means streams are being leaked in application code. + * Log a warning, including the stack trace of the caller, + * then abort the stream. + *

+ * This does not attempt to invoke {@link #close()} as that is + * a more complex operation, and this method is being executed + * during a GC finalization phase. + *

+ * Applications MUST close their streams; this is a defensive + * operation to return http connections and warn the end users + * that their applications are at risk of running out of connections. + * + * {@inheritDoc} + */ + @Override + protected void finalize() throws Throwable { + leakReporter.close(); + super.finalize(); + } + + /** + * Get the current input policy. + * @return input policy. + */ + @VisibleForTesting + public S3AInputPolicy getInputPolicy() { + return inputPolicy; + } + + /** + * Set/update the input policy of the stream. + * This updates the stream statistics. + * @param inputPolicy new input policy. + */ + protected void setInputPolicy(S3AInputPolicy inputPolicy) { + LOG.debug("Switching to input policy {}", inputPolicy); + this.inputPolicy = inputPolicy; + streamStatistics.inputPolicySet(inputPolicy.ordinal()); + } + + /** + * Access the input stream statistics. + * This is for internal testing and may be removed without warning. + * @return the statistics for this input stream + */ + @InterfaceAudience.Private + @InterfaceStability.Unstable + @VisibleForTesting + public S3AInputStreamStatistics getS3AStreamStatistics() { + return streamStatistics; + } + + @Override + public IOStatistics getIOStatistics() { + return ioStatistics; + } + + @Override + public boolean hasCapability(String capability) { + switch (toLowerCase(capability)) { + case StreamCapabilities.IOSTATISTICS: + case StreamCapabilities.IOSTATISTICS_CONTEXT: + case StreamStatisticNames.STREAM_LEAKS: + case StreamCapabilities.READAHEAD: + case StreamCapabilities.UNBUFFER: + case StreamCapabilities.VECTOREDIO: + return true; + default: + return false; + } + } + + + protected S3AReadOpContext getContext() { + return context; + } + + protected StreamReadCallbacks getCallbacks() { + return callbacks; + } + + protected ExecutorService getBoundedThreadPool() { + return boundedThreadPool; + } + + protected String getUri() { + return uri; + } + + protected String getBucket() { + return bucket; + } + + protected String getKey() { + return key; + } + + protected String getPathStr() { + return pathStr; + } + + protected long getContentLength() { + return contentLength; + } + + protected S3AInputStreamStatistics getStreamStatistics() { + return streamStatistics; + } + + protected IOStatisticsAggregator getThreadIOStatistics() { + return threadIOStatistics; + } + + protected S3ObjectAttributes getObjectAttributes() { + return objectAttributes; + } +} + + diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/streams/ClassicInputStreamFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/streams/ClassicInputStreamFactory.java new file mode 100644 index 0000000000000..a2720b0d05058 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/streams/ClassicInputStreamFactory.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.streams; + +import java.io.IOException; + +import org.apache.hadoop.fs.s3a.S3AInputStream; +import org.apache.hadoop.service.AbstractService; + +/** + * Factory of classic {@link S3AInputStream} instances. + */ +public class ClassicInputStreamFactory extends AbstractService + implements InputStreamFactory { + + public ClassicInputStreamFactory() { + super("ClassicInputStreamFactory"); + } + + @Override + public AbstractS3AInputStream create(final FactoryStreamParameters parameters) + throws IOException { + return new S3AInputStream(parameters); + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/streams/FactoryStreamParameters.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/streams/FactoryStreamParameters.java new file mode 100644 index 0000000000000..c9f2208c505d9 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/streams/FactoryStreamParameters.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.streams; + +import java.util.concurrent.ExecutorService; + +import org.apache.hadoop.fs.s3a.S3AReadOpContext; +import org.apache.hadoop.fs.s3a.S3ObjectAttributes; +import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; + +import static java.util.Objects.requireNonNull; + +/** + * Parameters for input streams created through + * {@link InputStreamFactory}. + * It is designed to be extensible; the {@link #build()} + * operation does not freeze the parameters -instead it simply + * verifies that all required values are set. + */ +public final class FactoryStreamParameters { + + private S3AReadOpContext context; + + private S3ObjectAttributes objectAttributes; + + private StreamReadCallbacks callbacks; + + private S3AInputStreamStatistics streamStatistics; + + private ExecutorService boundedThreadPool; + + /** + * Read operation context. + */ + public S3AReadOpContext getContext() { + return context; + } + + /** + * Set builder value. + * @param value new value + * @return the builder + */ + public FactoryStreamParameters withContext(S3AReadOpContext value) { + context = value; + return this; + } + + /** + * Attributes of the object. + */ + public S3ObjectAttributes getObjectAttributes() { + return objectAttributes; + } + + /** + * Set builder value. + * @param value new value + * @return the builder + */ + public FactoryStreamParameters withObjectAttributes(S3ObjectAttributes value) { + objectAttributes = value; + return this; + } + + /** + * Callbacks to the store. + */ + public StreamReadCallbacks getCallbacks() { + return callbacks; + } + + /** + * Set builder value. + * @param value new value + * @return the builder + */ + public FactoryStreamParameters withCallbacks(StreamReadCallbacks value) { + callbacks = value; + return this; + } + + /** + * Stream statistics. + */ + public S3AInputStreamStatistics getStreamStatistics() { + return streamStatistics; + } + + /** + * Set builder value. + * @param value new value + * @return the builder + */ + public FactoryStreamParameters withStreamStatistics(S3AInputStreamStatistics value) { + streamStatistics = value; + return this; + } + + /** + * Bounded thread pool for submitting asynchronous + * work. + */ + public ExecutorService getBoundedThreadPool() { + return boundedThreadPool; + } + + /** + * Set builder value. + * @param value new value + * @return the builder + */ + public FactoryStreamParameters withBoundedThreadPool(ExecutorService value) { + boundedThreadPool = value; + return this; + } + + /** + * Validate that all attributes are as expected. + * Mock tests can skip this if required. + * @return the object. + */ + public FactoryStreamParameters build() { + requireNonNull(boundedThreadPool, "boundedThreadPool"); + requireNonNull(callbacks, "callbacks"); + requireNonNull(context, "context"); + requireNonNull(objectAttributes, "objectAttributes"); + requireNonNull(streamStatistics, "streamStatistics"); + requireNonNull(boundedThreadPool, "boundedThreadPool"); + return this; + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/streams/InputStreamFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/streams/InputStreamFactory.java new file mode 100644 index 0000000000000..56b158e50f691 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/streams/InputStreamFactory.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.streams; + +import java.io.IOException; + +import org.apache.hadoop.service.Service; + +/** + * A Factory for input streams. + *

+ * This class is instantiated during initialization of + * {@code S3AStore}, it then follows the same service + * lifecycle. + *

+ * Note for maintainers: do try and keep this mostly stable. + * If new parameters need to be added, expand the + * {@link FactoryStreamParameters} class, rather than change the + * interface signature. + */ +public interface InputStreamFactory extends Service { + + /** + * Create a new input stream. + * @param parameters parameters. + * @return the input stream + * @throws problem creating the stream. + */ + AbstractS3AInputStream create(FactoryStreamParameters parameters) + throws IOException; + +} + diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/streams/StreamReadCallbacks.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/streams/StreamReadCallbacks.java new file mode 100644 index 0000000000000..c4441af07a2e2 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/streams/StreamReadCallbacks.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.streams; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.CompletableFuture; + +import software.amazon.awssdk.core.ResponseInputStream; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; + +import org.apache.hadoop.fs.s3a.Retries; +import org.apache.hadoop.util.functional.CallableRaisingIOE; + +/** + * Callbacks for reading input stream data from the S3 Store. + */ +public interface StreamReadCallbacks extends Closeable { + + /** + * Create a GET request builder. + * @param key object key + * @return the request builder + */ + GetObjectRequest.Builder newGetRequestBuilder(String key); + + /** + * Execute the request. + * When CSE is enabled with reading of unencrypted data, The object is checked if it is + * encrypted and if so, the request is made with encrypted S3 client. If the object is + * not encrypted, the request is made with unencrypted s3 client. + * @param request the request + * @return the response + * @throws IOException on any failure. + */ + @Retries.OnceRaw + ResponseInputStream getObject(GetObjectRequest request) throws IOException; + + /** + * Submit some asynchronous work, for example, draining a stream. + * @param operation operation to invoke + * @param return type + * @return a future. + */ + CompletableFuture submit(CallableRaisingIOE operation); + +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/streams/package-info.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/streams/package-info.java new file mode 100644 index 0000000000000..bb79fb129ef2b --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/streams/package-info.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Stream factory and support. + */ + +@InterfaceAudience.LimitedPrivate("Extension Libraries") +@InterfaceStability.Unstable +package org.apache.hadoop.fs.s3a.streams; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractSeek.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractSeek.java index dd41583de3fe4..60e3d66317a4c 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractSeek.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractSeek.java @@ -84,7 +84,7 @@ public class ITestS3AContractSeek extends AbstractContractSeekTest { * which S3A Supports. * @return a list of seek policies to test. */ - @Parameterized.Parameters + @Parameterized.Parameters(name="policy={0}") public static Collection params() { return Arrays.asList(new Object[][]{ {FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL, Default_JSSE}, diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java index 24115177f35a2..97af80e70a542 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java @@ -58,6 +58,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.alias.CredentialProvider; import org.apache.hadoop.security.alias.CredentialProviderFactory; +import org.apache.hadoop.test.AbstractHadoopTestBase; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.VersionInfo; import org.apache.http.HttpStatus; @@ -75,7 +76,7 @@ /** * S3A tests for configuration, especially credentials. */ -public class ITestS3AConfiguration { +public class ITestS3AConfiguration extends AbstractHadoopTestBase { private static final String EXAMPLE_ID = "AKASOMEACCESSKEY"; private static final String EXAMPLE_KEY = "RGV0cm9pdCBSZ/WQgY2xl/YW5lZCB1cAEXAMPLE"; @@ -487,11 +488,20 @@ public void testDirectoryAllocatorDefval() throws Throwable { conf = new Configuration(); conf.unset(Constants.BUFFER_DIR); fs = S3ATestUtils.createTestFileSystem(conf); - File tmp = fs.createTmpFileForWrite("out-", 1024, conf); + File tmp = createTemporaryFileForWriting(); assertTrue("not found: " + tmp, tmp.exists()); tmp.delete(); } + /** + * Create a temporary file for writing; requires the FS to have been created/initialized. + * @return a temporary file + * @throws IOException creation issues. + */ + private File createTemporaryFileForWriting() throws IOException { + return fs.getS3AInternals().getStore().createTemporaryFileForWriting("out-", 1024, conf); + } + @Test public void testDirectoryAllocatorRR() throws Throwable { File dir1 = GenericTestUtils.getRandomizedTestDir(); @@ -501,9 +511,9 @@ public void testDirectoryAllocatorRR() throws Throwable { conf = new Configuration(); conf.set(Constants.BUFFER_DIR, dir1 + ", " + dir2); fs = S3ATestUtils.createTestFileSystem(conf); - File tmp1 = fs.createTmpFileForWrite("out-", 1024, conf); + File tmp1 = createTemporaryFileForWriting(); tmp1.delete(); - File tmp2 = fs.createTmpFileForWrite("out-", 1024, conf); + File tmp2 = createTemporaryFileForWriting(); tmp2.delete(); assertNotEquals("round robin not working", tmp1.getParent(), tmp2.getParent()); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java index 6eccdc23dd5d5..a0a12eb1b3899 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java @@ -40,6 +40,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.audit.impl.NoopSpan; import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets; +import org.apache.hadoop.fs.s3a.streams.FactoryStreamParameters; +import org.apache.hadoop.fs.s3a.streams.StreamReadCallbacks; import org.apache.hadoop.util.functional.CallableRaisingIOE; import org.apache.http.NoHttpResponseException; @@ -164,7 +166,7 @@ private static void assertReadValueMatchesOffset( * @return a stream. */ private S3AInputStream getMockedS3AInputStream( - S3AInputStream.InputStreamCallbacks streamCallback) { + StreamReadCallbacks streamCallback) { Path path = new Path("test-path"); String eTag = "test-etag"; String versionId = "test-version-id"; @@ -187,12 +189,14 @@ private S3AInputStream getMockedS3AInputStream( s3AFileStatus, NoopSpan.INSTANCE); - return new S3AInputStream( - s3AReadOpContext, - s3ObjectAttributes, - streamCallback, - s3AReadOpContext.getS3AStatisticsContext().newInputStreamStatistics(), - null); + FactoryStreamParameters parameters = new FactoryStreamParameters() + .withCallbacks(streamCallback) + .withObjectAttributes(s3ObjectAttributes) + .withContext(s3AReadOpContext) + .withStreamStatistics( s3AReadOpContext.getS3AStatisticsContext().newInputStreamStatistics()) + .withBoundedThreadPool(null); + + return new S3AInputStream(parameters); } /** @@ -203,7 +207,7 @@ private S3AInputStream getMockedS3AInputStream( * @param ex exception to raise on failure * @return mocked object. */ - private S3AInputStream.InputStreamCallbacks failingInputStreamCallbacks( + private StreamReadCallbacks failingInputStreamCallbacks( final RuntimeException ex) { GetObjectResponse objectResponse = GetObjectResponse.builder() @@ -238,7 +242,7 @@ private S3AInputStream.InputStreamCallbacks failingInputStreamCallbacks( * @param ex exception to raise on failure * @return mocked object. */ - private S3AInputStream.InputStreamCallbacks maybeFailInGetCallback( + private StreamReadCallbacks maybeFailInGetCallback( final RuntimeException ex, final Function failurePredicate) { GetObjectResponse objectResponse = GetObjectResponse.builder() @@ -259,13 +263,13 @@ private S3AInputStream.InputStreamCallbacks maybeFailInGetCallback( * @param streamFactory factory for the stream to return on the given attempt. * @return mocked object. */ - private S3AInputStream.InputStreamCallbacks mockInputStreamCallback( + private StreamReadCallbacks mockInputStreamCallback( final RuntimeException ex, final Function failurePredicate, final Function> streamFactory) { - return new S3AInputStream.InputStreamCallbacks() { + return new StreamReadCallbacks() { private int attempt = 0; @Override diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/MockS3ARemoteObject.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/MockS3ARemoteObject.java index 5fbbc3a127997..4a572441c56b7 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/MockS3ARemoteObject.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/MockS3ARemoteObject.java @@ -29,8 +29,8 @@ import software.amazon.awssdk.services.s3.model.GetObjectResponse; import org.apache.hadoop.fs.impl.prefetch.Validate; -import org.apache.hadoop.fs.s3a.S3AInputStream; import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext; +import org.apache.hadoop.fs.s3a.streams.StreamReadCallbacks; import org.apache.hadoop.util.functional.CallableRaisingIOE; /** @@ -95,8 +95,8 @@ public static byte byteAtOffset(int offset) { return (byte) (offset % 128); } - public static S3AInputStream.InputStreamCallbacks createClient(String bucketName) { - return new S3AInputStream.InputStreamCallbacks() { + public static StreamReadCallbacks createClient(String bucketName) { + return new StreamReadCallbacks() { @Override public ResponseInputStream getObject( GetObjectRequest request) { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchFakes.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchFakes.java index 1c509702188f3..eb7ed4fcaabfe 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchFakes.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchFakes.java @@ -52,7 +52,6 @@ import org.apache.hadoop.fs.s3a.S3AEncryptionMethods; import org.apache.hadoop.fs.s3a.S3AFileStatus; import org.apache.hadoop.fs.s3a.S3AInputPolicy; -import org.apache.hadoop.fs.s3a.S3AInputStream; import org.apache.hadoop.fs.s3a.S3AReadOpContext; import org.apache.hadoop.fs.s3a.S3ObjectAttributes; import org.apache.hadoop.fs.s3a.VectoredIOContext; @@ -62,6 +61,7 @@ import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext; import org.apache.hadoop.fs.s3a.statistics.impl.CountingChangeTracker; import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext; +import org.apache.hadoop.fs.s3a.streams.StreamReadCallbacks; import org.apache.hadoop.fs.statistics.DurationTrackerFactory; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; @@ -187,7 +187,7 @@ public static ResponseInputStream createS3ObjectInputStream( AbortableInputStream.create(new ByteArrayInputStream(buffer), () -> {})); } - public static S3AInputStream.InputStreamCallbacks createInputStreamCallbacks( + public static StreamReadCallbacks createInputStreamCallbacks( String bucket) { GetObjectResponse objectResponse = GetObjectResponse.builder() @@ -197,7 +197,7 @@ public static S3AInputStream.InputStreamCallbacks createInputStreamCallbacks( ResponseInputStream responseInputStream = createS3ObjectInputStream(objectResponse, new byte[8]); - return new S3AInputStream.InputStreamCallbacks() { + return new StreamReadCallbacks() { @Override public ResponseInputStream getObject(GetObjectRequest request) { return responseInputStream; @@ -238,7 +238,7 @@ public static S3ARemoteInputStream createInputStream( prefetchBlockSize, prefetchBlockCount); - S3AInputStream.InputStreamCallbacks callbacks = + StreamReadCallbacks callbacks = createInputStreamCallbacks(bucket); S3AInputStreamStatistics stats = s3AReadOpContext.getS3AStatisticsContext().newInputStreamStatistics(); @@ -289,7 +289,7 @@ public static class FakeS3AInMemoryInputStream public FakeS3AInMemoryInputStream( S3AReadOpContext context, S3ObjectAttributes s3Attributes, - S3AInputStream.InputStreamCallbacks client, + StreamReadCallbacks client, S3AInputStreamStatistics streamStatistics) { super(context, s3Attributes, client, streamStatistics); } @@ -391,7 +391,7 @@ public static class FakeS3ACachingInputStream extends S3ACachingInputStream { public FakeS3ACachingInputStream( S3AReadOpContext context, S3ObjectAttributes s3Attributes, - S3AInputStream.InputStreamCallbacks client, + StreamReadCallbacks client, S3AInputStreamStatistics streamStatistics) { super(context, s3Attributes, client, streamStatistics, CONF, new LocalDirAllocator( diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ARemoteInputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ARemoteInputStream.java index 8ce26033c1182..35a08cc792a98 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ARemoteInputStream.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ARemoteInputStream.java @@ -31,11 +31,11 @@ import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.impl.prefetch.ExceptionAsserts; import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool; -import org.apache.hadoop.fs.s3a.S3AInputStream; import org.apache.hadoop.fs.s3a.S3AReadOpContext; import org.apache.hadoop.fs.s3a.S3ATestUtils; import org.apache.hadoop.fs.s3a.S3ObjectAttributes; import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; +import org.apache.hadoop.fs.s3a.streams.StreamReadCallbacks; import org.apache.hadoop.test.AbstractHadoopTestBase; import static org.assertj.core.api.Assertions.assertThat; @@ -53,7 +53,7 @@ public class TestS3ARemoteInputStream extends AbstractHadoopTestBase { private final ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(threadPool); - private final S3AInputStream.InputStreamCallbacks client = + private final StreamReadCallbacks client = MockS3ARemoteObject.createClient("bucket"); @Test diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ARemoteObject.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ARemoteObject.java index b3788aac80834..6cee43bb6c6b5 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ARemoteObject.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ARemoteObject.java @@ -26,11 +26,11 @@ import org.apache.hadoop.fs.impl.prefetch.ExceptionAsserts; import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool; -import org.apache.hadoop.fs.s3a.S3AInputStream; import org.apache.hadoop.fs.s3a.S3AReadOpContext; import org.apache.hadoop.fs.s3a.S3ObjectAttributes; import org.apache.hadoop.fs.s3a.impl.ChangeTracker; import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; +import org.apache.hadoop.fs.s3a.streams.StreamReadCallbacks; import org.apache.hadoop.test.AbstractHadoopTestBase; public class TestS3ARemoteObject extends AbstractHadoopTestBase { @@ -40,7 +40,7 @@ public class TestS3ARemoteObject extends AbstractHadoopTestBase { private final ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(threadPool); - private final S3AInputStream.InputStreamCallbacks client = + private final StreamReadCallbacks client = MockS3ARemoteObject.createClient("bucket"); @Test diff --git a/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties b/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties index 7442a357f9777..f548e7c98f9eb 100644 --- a/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties +++ b/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties @@ -98,3 +98,7 @@ log4j.logger.org.apache.hadoop.fs.s3a.S3AStorageStatistics=INFO # uncomment this to get S3 Delete requests to return the list of deleted objects # log4.logger.org.apache.hadoop.fs.s3a.impl.RequestFactoryImpl=TRACE +# debug service lifecycle of components such as S3AStore and +# services it launches itself. +log4.logger.org.apache.hadoop.service=DEBUG +