Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HADOOP-18959 S3A : Use builder for prefetch CachingBlockManager #6240

Merged
merged 5 commits into from
Nov 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hadoop.fs.impl.prefetch;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;

/**
* This class is used to provide parameters to {@link BlockManager}.
*/
@InterfaceAudience.Private
public final class BlockManagerParameters {

/**
* Asynchronous tasks are performed in this pool.
*/
private ExecutorServiceFuturePool futurePool;

/**
* Information about each block of the underlying file.
*/
private BlockData blockData;

/**
* Size of the in-memory cache in terms of number of blocks.
*/
private int bufferPoolSize;

/**
* Statistics for the stream.
*/
private PrefetchingStatistics prefetchingStatistics;

/**
* The configuration object.
*/
private Configuration conf;

/**
* The local dir allocator instance.
*/
private LocalDirAllocator localDirAllocator;

/**
* Max blocks count to be kept in cache at any time.
*/
private int maxBlocksCount;

/**
* Tracker with statistics to update.
*/
private DurationTrackerFactory trackerFactory;

/**
* @return The Executor future pool to perform async prefetch tasks.
*/
public ExecutorServiceFuturePool getFuturePool() {
return futurePool;
}

/**
* @return The object holding blocks data info for the underlying file.
*/
public BlockData getBlockData() {
return blockData;
}

/**
* @return The size of the in-memory cache.
*/
public int getBufferPoolSize() {
return bufferPoolSize;
}

/**
* @return The prefetching statistics for the stream.
*/
public PrefetchingStatistics getPrefetchingStatistics() {
return prefetchingStatistics;
}

/**
* @return The configuration object.
*/
public Configuration getConf() {
return conf;
}

/**
* @return The local dir allocator instance.
*/
public LocalDirAllocator getLocalDirAllocator() {
return localDirAllocator;
}

/**
* @return The max blocks count to be kept in cache at any time.
*/
public int getMaxBlocksCount() {
return maxBlocksCount;
}

/**
* @return The duration tracker with statistics to update.
*/
public DurationTrackerFactory getTrackerFactory() {
return trackerFactory;
}

/**
* Sets the executor service future pool that is later used to perform
* async prefetch tasks.
*
* @param pool The future pool.
* @return The builder.
*/
public BlockManagerParameters withFuturePool(
final ExecutorServiceFuturePool pool) {
this.futurePool = pool;
return this;
}

/**
* Sets the object holding blocks data info for the underlying file.
*
* @param data The block data object.
* @return The builder.
*/
public BlockManagerParameters withBlockData(
final BlockData data) {
this.blockData = data;
return this;
}

/**
* Sets the in-memory cache size as number of blocks.
*
* @param poolSize The buffer pool size as number of blocks.
* @return The builder.
*/
public BlockManagerParameters withBufferPoolSize(
final int poolSize) {
this.bufferPoolSize = poolSize;
return this;
}

/**
* Sets the prefetching statistics for the stream.
*
* @param statistics The prefetching statistics.
* @return The builder.
*/
public BlockManagerParameters withPrefetchingStatistics(
final PrefetchingStatistics statistics) {
this.prefetchingStatistics = statistics;
return this;
}

/**
* Sets the configuration object.
*
* @param configuration The configuration object.
* @return The builder.
*/
public BlockManagerParameters withConf(
final Configuration configuration) {
this.conf = configuration;
return this;
}

/**
* Sets the local dir allocator for round-robin disk allocation
* while creating files.
*
* @param dirAllocator The local dir allocator object.
* @return The builder.
*/
public BlockManagerParameters withLocalDirAllocator(
final LocalDirAllocator dirAllocator) {
this.localDirAllocator = dirAllocator;
return this;
}

/**
* Sets the max blocks count to be kept in cache at any time.
*
* @param blocksCount The max blocks count.
* @return The builder.
*/
public BlockManagerParameters withMaxBlocksCount(
final int blocksCount) {
this.maxBlocksCount = blocksCount;
return this;
}

/**
* Sets the duration tracker with statistics to update.
*
* @param factory The tracker factory object.
* @return The builder.
*/
public BlockManagerParameters withTrackerFactory(
final DurationTrackerFactory factory) {
this.trackerFactory = factory;
return this;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

import javax.annotation.Nonnull;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -105,47 +107,33 @@ public abstract class CachingBlockManager extends BlockManager {
/**
* Constructs an instance of a {@code CachingBlockManager}.
*
* @param futurePool asynchronous tasks are performed in this pool.
* @param blockData information about each block of the underlying file.
* @param bufferPoolSize size of the in-memory cache in terms of number of blocks.
* @param prefetchingStatistics statistics for this stream.
* @param conf the configuration.
* @param localDirAllocator the local dir allocator instance.
* @param maxBlocksCount max blocks count to be kept in cache at any time.
* @param trackerFactory tracker with statistics to update.
* @param blockManagerParameters params for block manager.
* @throws IllegalArgumentException if bufferPoolSize is zero or negative.
*/
@SuppressWarnings("checkstyle:parameternumber")
public CachingBlockManager(
ExecutorServiceFuturePool futurePool,
BlockData blockData,
int bufferPoolSize,
PrefetchingStatistics prefetchingStatistics,
Configuration conf,
LocalDirAllocator localDirAllocator,
int maxBlocksCount,
DurationTrackerFactory trackerFactory) {
super(blockData);

Validate.checkPositiveInteger(bufferPoolSize, "bufferPoolSize");

this.futurePool = requireNonNull(futurePool);
this.bufferPoolSize = bufferPoolSize;
public CachingBlockManager(@Nonnull final BlockManagerParameters blockManagerParameters) {
super(blockManagerParameters.getBlockData());

Validate.checkPositiveInteger(blockManagerParameters.getBufferPoolSize(), "bufferPoolSize");

this.futurePool = requireNonNull(blockManagerParameters.getFuturePool());
this.bufferPoolSize = blockManagerParameters.getBufferPoolSize();
this.numCachingErrors = new AtomicInteger();
this.numReadErrors = new AtomicInteger();
this.cachingDisabled = new AtomicBoolean();
this.prefetchingStatistics = requireNonNull(prefetchingStatistics);
this.conf = requireNonNull(conf);
this.prefetchingStatistics = requireNonNull(
blockManagerParameters.getPrefetchingStatistics());
this.conf = requireNonNull(blockManagerParameters.getConf());

if (this.getBlockData().getFileSize() > 0) {
this.bufferPool = new BufferPool(bufferPoolSize, this.getBlockData().getBlockSize(),
this.prefetchingStatistics);
this.cache = this.createCache(maxBlocksCount, trackerFactory);
this.cache = this.createCache(blockManagerParameters.getMaxBlocksCount(),
blockManagerParameters.getTrackerFactory());
}

this.ops = new BlockOperations();
this.ops.setDebug(false);
this.localDirAllocator = localDirAllocator;
this.localDirAllocator = blockManagerParameters.getLocalDirAllocator();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,17 @@
import java.io.IOException;
import java.nio.ByteBuffer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.impl.prefetch.BlockData;
import org.apache.hadoop.fs.impl.prefetch.BlockManagerParameters;
import org.apache.hadoop.fs.impl.prefetch.CachingBlockManager;
import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool;
import org.apache.hadoop.fs.impl.prefetch.Validate;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;

import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PREFETCH_MAX_BLOCKS_COUNT;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_MAX_BLOCKS_COUNT;

/**
* Provides access to S3 file one block at a time.
*/
public class S3ACachingBlockManager extends CachingBlockManager {

private static final Logger LOG = LoggerFactory.getLogger(
S3ACachingBlockManager.class);

/**
* Reader that reads from S3 file.
*/
Expand All @@ -52,32 +41,15 @@ public class S3ACachingBlockManager extends CachingBlockManager {
/**
* Constructs an instance of a {@code S3ACachingBlockManager}.
*
* @param futurePool asynchronous tasks are performed in this pool.
* @param blockManagerParameters params for block manager.
* @param reader reader that reads from S3 file.
* @param blockData information about each block of the S3 file.
* @param bufferPoolSize size of the in-memory cache in terms of number of blocks.
* @param streamStatistics statistics for this stream.
* @param conf the configuration.
* @param localDirAllocator the local dir allocator instance.
* @throws IllegalArgumentException if reader is null.
*/
public S3ACachingBlockManager(
ExecutorServiceFuturePool futurePool,
S3ARemoteObjectReader reader,
BlockData blockData,
int bufferPoolSize,
S3AInputStreamStatistics streamStatistics,
Configuration conf,
LocalDirAllocator localDirAllocator) {
@Nonnull final BlockManagerParameters blockManagerParameters,
final S3ARemoteObjectReader reader) {

super(futurePool,
blockData,
bufferPoolSize,
streamStatistics,
conf,
localDirAllocator,
conf.getInt(PREFETCH_MAX_BLOCKS_COUNT, DEFAULT_PREFETCH_MAX_BLOCKS_COUNT),
streamStatistics);
super(blockManagerParameters);

Validate.checkNotNull(reader, "reader");

Expand Down
Loading