Skip to content

Commit

Permalink
HADOOP-19354. S3AInputStream to be created by factory under S3AStore
Browse files Browse the repository at this point in the history
First iteration
* Factory interface with a parameter object creation method
* Base class AbstractS3AInputStream for all streams to create
* S3AInputStream subclasses that and has a factory
* Production and test code to use it

Not done
* Input stream callbacks pushed down to S3Store
* S3Store to dynamically choose factory at startup, stop in close()
* S3Store to implement the factory interface, completing final binding
  operations (callbacks, stats)

Change-Id: I8d0f86ca1f3463d4987a43924f155ce0c0644180
  • Loading branch information
steveloughran committed Dec 6, 2024
1 parent 86d8fa6 commit 7d76047
Show file tree
Hide file tree
Showing 25 changed files with 963 additions and 377 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@
import org.apache.hadoop.fs.s3a.impl.UploadContentProviders;
import org.apache.hadoop.fs.s3a.impl.CSEUtils;
import org.apache.hadoop.fs.s3a.prefetch.S3APrefetchingInputStream;
import org.apache.hadoop.fs.s3a.streams.ClassicInputStreamFactory;
import org.apache.hadoop.fs.s3a.streams.FactoryStreamParameters;
import org.apache.hadoop.fs.s3a.streams.InputStreamFactory;
import org.apache.hadoop.fs.s3a.streams.StreamReadCallbacks;
import org.apache.hadoop.fs.s3a.tools.MarkerToolOperations;
import org.apache.hadoop.fs.s3a.tools.MarkerToolOperationsImpl;
import org.apache.hadoop.fs.statistics.DurationTracker;
Expand Down Expand Up @@ -305,9 +309,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,

private String username;

/**
* Store back end.
*/
private S3AStore store;

/**
Expand Down Expand Up @@ -357,7 +358,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
/** Log to warn of storage class configuration problems. */
private static final LogExactlyOnce STORAGE_CLASS_WARNING = new LogExactlyOnce(LOG);

private LocalDirAllocator directoryAllocator;
private String cannedACL;

/**
Expand Down Expand Up @@ -803,12 +803,12 @@ public void initialize(URI name, Configuration originalConf)
s3AccessGrantsEnabled = conf.getBoolean(AWS_S3_ACCESS_GRANTS_ENABLED, false);

int rateLimitCapacity = intOption(conf, S3A_IO_RATE_LIMIT, DEFAULT_S3A_IO_RATE_LIMIT, 0);
// now create the store
// now create and initialize the store
store = createS3AStore(clientManager, rateLimitCapacity);
// the s3 client is created through the store, rather than
// directly through the client manager.
// this is to aid mocking.
s3Client = store.getOrCreateS3Client();
s3Client = getStore().getOrCreateS3Client();
// The filesystem is now ready to perform operations against
// S3
// This initiates a probe against S3 for the bucket existing.
Expand Down Expand Up @@ -851,7 +851,7 @@ private S3AFileSystemOperations createFileSystemHandler() {


/**
* Create the S3AStore instance.
* Create and start the S3AStore instance.
* This is protected so that tests can override it.
* @param clientManager client manager
* @param rateLimitCapacity rate limit
Expand All @@ -860,7 +860,7 @@ private S3AFileSystemOperations createFileSystemHandler() {
@VisibleForTesting
protected S3AStore createS3AStore(final ClientManager clientManager,
final int rateLimitCapacity) {
return new S3AStoreBuilder()
final S3AStore st = new S3AStoreBuilder()
.withAuditSpanSource(getAuditManager())
.withClientManager(clientManager)
.withDurationTrackerFactory(getDurationTrackerFactory())
Expand All @@ -872,6 +872,9 @@ protected S3AStore createS3AStore(final ClientManager clientManager,
.withReadRateLimiter(unlimitedRate())
.withWriteRateLimiter(RateLimitingFactory.create(rateLimitCapacity))
.build();
st.init(getConf());
st.start();
return st;
}

/**
Expand Down Expand Up @@ -1344,6 +1347,15 @@ public FlagSet<PerformanceFlagEnum> getPerformanceFlags() {
return performanceFlags;
}


/**
* Get the store for low-level operations.
* @return the store the S3A FS is working through.
*/
private S3AStore getStore() {
return store;
}

/**
* Implementation of all operations used by delegation tokens.
*/
Expand Down Expand Up @@ -1549,7 +1561,7 @@ public S3Client getAmazonS3Client(String reason) {

@Override
public S3AStore getStore() {
return store;
return S3AFileSystem.this.getStore();
}

/**
Expand Down Expand Up @@ -1678,28 +1690,8 @@ public S3AEncryptionMethods getS3EncryptionAlgorithm() {
*/
File createTmpFileForWrite(String pathStr, long size,
Configuration conf) throws IOException {
initLocalDirAllocatorIfNotInitialized(conf);
Path path = directoryAllocator.getLocalPathForWrite(pathStr,
size, conf);
File dir = new File(path.getParent().toUri().getPath());
String prefix = path.getName();
// create a temp file on this directory
return File.createTempFile(prefix, null, dir);
}

/**
* Initialize dir allocator if not already initialized.
*
* @param conf The Configuration object.
*/
private void initLocalDirAllocatorIfNotInitialized(Configuration conf) {
if (directoryAllocator == null) {
synchronized (this) {
String bufferDir = conf.get(BUFFER_DIR) != null
? BUFFER_DIR : HADOOP_TMP_DIR;
directoryAllocator = new LocalDirAllocator(bufferDir);
}
}
return getS3AInternals().getStore().createTemporaryFileForWriting(pathStr, size, conf);
}

/**
Expand Down Expand Up @@ -1894,35 +1886,46 @@ private FSDataInputStream executeOpen(

if (this.prefetchEnabled) {
Configuration configuration = getConf();
initLocalDirAllocatorIfNotInitialized(configuration);

return new FSDataInputStream(
new S3APrefetchingInputStream(
readContext.build(),
createObjectAttributes(path, fileStatus),
createInputStreamCallbacks(auditSpan),
inputStreamStats,
configuration,
directoryAllocator));
getStore().getDirectoryAllocator()));
} else {

// create the factory.
// TODO: move into S3AStore and export the factory API through
// the store, which will add some of the features (callbacks, stats)
// before invoking the real factory
InputStreamFactory factory = new ClassicInputStreamFactory();
factory.init(getConf());
factory.start();
FactoryStreamParameters parameters = new FactoryStreamParameters()
.withCallbacks(createInputStreamCallbacks(auditSpan))
.withObjectAttributes(createObjectAttributes(path, fileStatus))
.withContext(readContext.build())
.withStreamStatistics(inputStreamStats)
.withBoundedThreadPool(new SemaphoredDelegatingExecutor(
boundedThreadPool,
vectoredActiveRangeReads,
true,
inputStreamStats))
.build();

return new FSDataInputStream(
new S3AInputStream(
readContext.build(),
createObjectAttributes(path, fileStatus),
createInputStreamCallbacks(auditSpan),
inputStreamStats,
new SemaphoredDelegatingExecutor(
boundedThreadPool,
vectoredActiveRangeReads,
true,
inputStreamStats)));
factory.create(parameters));
}
}

/**
* Override point: create the callbacks for S3AInputStream.
* @return an implementation of the InputStreamCallbacks,
*/
private S3AInputStream.InputStreamCallbacks createInputStreamCallbacks(
private StreamReadCallbacks createInputStreamCallbacks(
final AuditSpan auditSpan) {
return new InputStreamCallbacksImpl(auditSpan);
}
Expand All @@ -1931,7 +1934,7 @@ private S3AInputStream.InputStreamCallbacks createInputStreamCallbacks(
* Operations needed by S3AInputStream to read data.
*/
private final class InputStreamCallbacksImpl implements
S3AInputStream.InputStreamCallbacks {
StreamReadCallbacks {

/**
* Audit span to activate before each call.
Expand Down Expand Up @@ -1967,7 +1970,7 @@ public ResponseInputStream<GetObjectResponse> getObject(GetObjectRequest request
IOException {
// active the audit span used for the operation
try (AuditSpan span = auditSpan.activate()) {
return fsHandler.getObject(store, request, getRequestFactory());
return fsHandler.getObject(getStore(), request, getRequestFactory());
}
}

Expand Down Expand Up @@ -1997,7 +2000,7 @@ private final class WriteOperationHelperCallbacksImpl
@Retries.OnceRaw
public CompleteMultipartUploadResponse completeMultipartUpload(
CompleteMultipartUploadRequest request) {
return store.completeMultipartUpload(request);
return getStore().completeMultipartUpload(request);
}

@Override
Expand All @@ -2007,7 +2010,7 @@ public UploadPartResponse uploadPart(
final RequestBody body,
final DurationTrackerFactory durationTrackerFactory)
throws AwsServiceException, UncheckedIOException {
return store.uploadPart(request, body, durationTrackerFactory);
return getStore().uploadPart(request, body, durationTrackerFactory);
}

/**
Expand Down Expand Up @@ -2804,7 +2807,7 @@ public long getDefaultBlockSize(Path path) {
*/
@Override
public long getObjectSize(S3Object s3Object) throws IOException {
return fsHandler.getS3ObjectSize(s3Object.key(), s3Object.size(), store, null);
return fsHandler.getS3ObjectSize(s3Object.key(), s3Object.size(), getStore(), null);
}

@Override
Expand Down Expand Up @@ -3035,7 +3038,7 @@ protected DurationTrackerFactory getDurationTrackerFactory() {
*/
protected DurationTrackerFactory nonNullDurationTrackerFactory(
DurationTrackerFactory factory) {
return store.nonNullDurationTrackerFactory(factory);
return getStore().nonNullDurationTrackerFactory(factory);
}

/**
Expand Down Expand Up @@ -3073,7 +3076,7 @@ protected HeadObjectResponse getObjectMetadata(String key,
ChangeTracker changeTracker,
Invoker changeInvoker,
String operation) throws IOException {
return store.headObject(key, changeTracker, changeInvoker, fsHandler, operation);
return getStore().headObject(key, changeTracker, changeInvoker, fsHandler, operation);
}

/**
Expand Down Expand Up @@ -3221,7 +3224,7 @@ public void incrementWriteOperations() {
protected void deleteObject(String key)
throws SdkException, IOException {
incrementWriteOperations();
store.deleteObject(getRequestFactory()
getStore().deleteObject(getRequestFactory()
.newDeleteObjectRequestBuilder(key)
.build());
}
Expand Down Expand Up @@ -3275,7 +3278,7 @@ void deleteObjectAtPath(Path f,
private DeleteObjectsResponse deleteObjects(DeleteObjectsRequest deleteRequest)
throws MultiObjectDeleteException, SdkException, IOException {
incrementWriteOperations();
DeleteObjectsResponse response = store.deleteObjects(deleteRequest).getValue();
DeleteObjectsResponse response = getStore().deleteObjects(deleteRequest).getValue();
if (!response.errors().isEmpty()) {
throw new MultiObjectDeleteException(response.errors());
}
Expand Down Expand Up @@ -3318,7 +3321,7 @@ public PutObjectRequest.Builder newPutObjectRequestBuilder(String key,
@Retries.OnceRaw
public UploadInfo putObject(PutObjectRequest putObjectRequest, File file,
ProgressableProgressListener listener) throws IOException {
return store.putObject(putObjectRequest, file, listener);
return getStore().putObject(putObjectRequest, file, listener);
}

/**
Expand Down Expand Up @@ -3419,7 +3422,7 @@ UploadPartResponse uploadPart(UploadPartRequest request, RequestBody body,
* @param bytes bytes in the request.
*/
protected void incrementPutStartStatistics(long bytes) {
store.incrementPutStartStatistics(bytes);
getStore().incrementPutStartStatistics(bytes);
}

/**
Expand All @@ -3430,7 +3433,7 @@ protected void incrementPutStartStatistics(long bytes) {
* @param bytes bytes in the request.
*/
protected void incrementPutCompletedStatistics(boolean success, long bytes) {
store.incrementPutCompletedStatistics(success, bytes);
getStore().incrementPutCompletedStatistics(success, bytes);
}

/**
Expand All @@ -3441,7 +3444,7 @@ protected void incrementPutCompletedStatistics(boolean success, long bytes) {
* @param bytes bytes successfully uploaded.
*/
protected void incrementPutProgressStatistics(String key, long bytes) {
store.incrementPutProgressStatistics(key, bytes);
getStore().incrementPutProgressStatistics(key, bytes);
}

/**
Expand Down Expand Up @@ -4321,9 +4324,9 @@ PutObjectResponse executePut(
String key = putObjectRequest.key();
long len = getPutRequestLength(putObjectRequest);
ProgressableProgressListener listener =
new ProgressableProgressListener(store, putObjectRequest.key(), progress);
new ProgressableProgressListener(getStore(), putObjectRequest.key(), progress);
UploadInfo info = putObject(putObjectRequest, file, listener);
PutObjectResponse result = store.waitForUploadCompletion(key, info).response();
PutObjectResponse result = getStore().waitForUploadCompletion(key, info).response();
listener.uploadCompleted(info.getFileUpload());

// post-write actions
Expand Down Expand Up @@ -4421,7 +4424,7 @@ public void close() throws IOException {
protected synchronized void stopAllServices() {
try {
trackDuration(getDurationTrackerFactory(), FILESYSTEM_CLOSE.getSymbol(), () -> {
closeAutocloseables(LOG, store);
closeAutocloseables(LOG, getStore());
store = null;
s3Client = null;

Expand Down Expand Up @@ -4642,7 +4645,7 @@ private CopyObjectResponse copyFile(String srcKey, String dstKey, long size,
() -> {
incrementStatistic(OBJECT_COPY_REQUESTS);

Copy copy = store.getOrCreateTransferManager().copy(
Copy copy = getStore().getOrCreateTransferManager().copy(
CopyRequest.builder()
.copyObjectRequest(copyRequest)
.build());
Expand Down Expand Up @@ -5857,7 +5860,7 @@ public BulkDelete createBulkDelete(final Path path)
*/
protected BulkDeleteOperation.BulkDeleteOperationCallbacks createBulkDeleteCallbacks(
Path path, int pageSize, AuditSpanS3A span) {
return new BulkDeleteOperationCallbacksImpl(store, pathToKey(path), pageSize, span);
return new BulkDeleteOperationCallbacksImpl(getStore(), pathToKey(path), pageSize, span);
}

}
Loading

0 comments on commit 7d76047

Please sign in to comment.