Skip to content

Commit

Permalink
HADOOP-18959 Use builder for prefetch CachingBlockManager
Browse files Browse the repository at this point in the history
  • Loading branch information
virajjasani committed Oct 30, 2023
1 parent a079f62 commit 633cbd4
Show file tree
Hide file tree
Showing 6 changed files with 348 additions and 155 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hadoop.fs.impl.prefetch;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;

/**
* This class is used to provide params to {@link BlockManager}.
*/
@InterfaceAudience.Private
public class BlockManagerParams {

/**
* Asynchronous tasks are performed in this pool.
*/
private final ExecutorServiceFuturePool futurePool;
/**
* Information about each block of the underlying file.
*/
private final BlockData blockData;
/**
* Size of the in-memory cache in terms of number of blocks.
*/
private final int bufferPoolSize;
/**
* Statistics for the stream.
*/
private final PrefetchingStatistics prefetchingStatistics;
/**
* The configuration object.
*/
private final Configuration conf;
/**
* The local dir allocator instance.
*/
private final LocalDirAllocator localDirAllocator;
/**
* Max blocks count to be kept in cache at any time.
*/
private final int maxBlocksCount;
/**
* Tracker with statistics to update.
*/
private final DurationTrackerFactory trackerFactory;

private BlockManagerParams(ExecutorServiceFuturePool futurePool, BlockData blockData,
int bufferPoolSize, PrefetchingStatistics prefetchingStatistics, Configuration conf,
LocalDirAllocator localDirAllocator, int maxBlocksCount,
DurationTrackerFactory trackerFactory) {
this.futurePool = futurePool;
this.blockData = blockData;
this.bufferPoolSize = bufferPoolSize;
this.prefetchingStatistics = prefetchingStatistics;
this.conf = conf;
this.localDirAllocator = localDirAllocator;
this.maxBlocksCount = maxBlocksCount;
this.trackerFactory = trackerFactory;
}

public ExecutorServiceFuturePool getFuturePool() {
return futurePool;
}

public BlockData getBlockData() {
return blockData;
}

public int getBufferPoolSize() {
return bufferPoolSize;
}

public PrefetchingStatistics getPrefetchingStatistics() {
return prefetchingStatistics;
}

public Configuration getConf() {
return conf;
}

public LocalDirAllocator getLocalDirAllocator() {
return localDirAllocator;
}

public int getMaxBlocksCount() {
return maxBlocksCount;
}

public DurationTrackerFactory getTrackerFactory() {
return trackerFactory;
}

public static class BlockManagerParamsBuilder {
private ExecutorServiceFuturePool futurePool;
private BlockData blockData;
private int bufferPoolSize;
private PrefetchingStatistics prefetchingStatistics;
private Configuration conf;
private LocalDirAllocator localDirAllocator;
private int maxBlocksCount;
private DurationTrackerFactory trackerFactory;

public BlockManagerParamsBuilder setFuturePool(ExecutorServiceFuturePool futurePool) {
this.futurePool = futurePool;
return this;
}

public BlockManagerParamsBuilder setBlockData(BlockData blockData) {
this.blockData = blockData;
return this;
}

public BlockManagerParamsBuilder setBufferPoolSize(int bufferPoolSize) {
this.bufferPoolSize = bufferPoolSize;
return this;
}

public BlockManagerParamsBuilder setPrefetchingStatistics(
PrefetchingStatistics prefetchingStatistics) {
this.prefetchingStatistics = prefetchingStatistics;
return this;
}

public BlockManagerParamsBuilder setConf(Configuration conf) {
this.conf = conf;
return this;
}

public BlockManagerParamsBuilder setLocalDirAllocator(LocalDirAllocator localDirAllocator) {
this.localDirAllocator = localDirAllocator;
return this;
}

public BlockManagerParamsBuilder setMaxBlocksCount(int maxBlocksCount) {
this.maxBlocksCount = maxBlocksCount;
return this;
}

public BlockManagerParamsBuilder setTrackerFactory(DurationTrackerFactory trackerFactory) {
this.trackerFactory = trackerFactory;
return this;
}

public BlockManagerParams build() {
return new BlockManagerParams(futurePool, blockData, bufferPoolSize, prefetchingStatistics,
conf, localDirAllocator, maxBlocksCount, trackerFactory);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

import javax.annotation.Nonnull;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -105,47 +107,32 @@ public abstract class CachingBlockManager extends BlockManager {
/**
* Constructs an instance of a {@code CachingBlockManager}.
*
* @param futurePool asynchronous tasks are performed in this pool.
* @param blockData information about each block of the underlying file.
* @param bufferPoolSize size of the in-memory cache in terms of number of blocks.
* @param prefetchingStatistics statistics for this stream.
* @param conf the configuration.
* @param localDirAllocator the local dir allocator instance.
* @param maxBlocksCount max blocks count to be kept in cache at any time.
* @param trackerFactory tracker with statistics to update.
* @param blockManagerParams params for block manager.
* @throws IllegalArgumentException if bufferPoolSize is zero or negative.
*/
@SuppressWarnings("checkstyle:parameternumber")
public CachingBlockManager(
ExecutorServiceFuturePool futurePool,
BlockData blockData,
int bufferPoolSize,
PrefetchingStatistics prefetchingStatistics,
Configuration conf,
LocalDirAllocator localDirAllocator,
int maxBlocksCount,
DurationTrackerFactory trackerFactory) {
super(blockData);

Validate.checkPositiveInteger(bufferPoolSize, "bufferPoolSize");

this.futurePool = requireNonNull(futurePool);
this.bufferPoolSize = bufferPoolSize;
public CachingBlockManager(@Nonnull final BlockManagerParams blockManagerParams) {
super(blockManagerParams.getBlockData());

Validate.checkPositiveInteger(blockManagerParams.getBufferPoolSize(), "bufferPoolSize");

this.futurePool = requireNonNull(blockManagerParams.getFuturePool());
this.bufferPoolSize = blockManagerParams.getBufferPoolSize();
this.numCachingErrors = new AtomicInteger();
this.numReadErrors = new AtomicInteger();
this.cachingDisabled = new AtomicBoolean();
this.prefetchingStatistics = requireNonNull(prefetchingStatistics);
this.conf = requireNonNull(conf);
this.prefetchingStatistics = requireNonNull(blockManagerParams.getPrefetchingStatistics());
this.conf = requireNonNull(blockManagerParams.getConf());

if (this.getBlockData().getFileSize() > 0) {
this.bufferPool = new BufferPool(bufferPoolSize, this.getBlockData().getBlockSize(),
this.prefetchingStatistics);
this.cache = this.createCache(maxBlocksCount, trackerFactory);
this.cache = this.createCache(blockManagerParams.getMaxBlocksCount(),
blockManagerParams.getTrackerFactory());
}

this.ops = new BlockOperations();
this.ops.setDebug(false);
this.localDirAllocator = localDirAllocator;
this.localDirAllocator = blockManagerParams.getLocalDirAllocator();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,17 @@
import java.io.IOException;
import java.nio.ByteBuffer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.impl.prefetch.BlockData;
import org.apache.hadoop.fs.impl.prefetch.BlockManagerParams;
import org.apache.hadoop.fs.impl.prefetch.CachingBlockManager;
import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool;
import org.apache.hadoop.fs.impl.prefetch.Validate;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;

import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PREFETCH_MAX_BLOCKS_COUNT;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_MAX_BLOCKS_COUNT;

/**
* Provides access to S3 file one block at a time.
*/
public class S3ACachingBlockManager extends CachingBlockManager {

private static final Logger LOG = LoggerFactory.getLogger(
S3ACachingBlockManager.class);

/**
* Reader that reads from S3 file.
*/
Expand All @@ -52,32 +41,15 @@ public class S3ACachingBlockManager extends CachingBlockManager {
/**
* Constructs an instance of a {@code S3ACachingBlockManager}.
*
* @param futurePool asynchronous tasks are performed in this pool.
* @param blockManagerParams params for block manager.
* @param reader reader that reads from S3 file.
* @param blockData information about each block of the S3 file.
* @param bufferPoolSize size of the in-memory cache in terms of number of blocks.
* @param streamStatistics statistics for this stream.
* @param conf the configuration.
* @param localDirAllocator the local dir allocator instance.
* @throws IllegalArgumentException if reader is null.
*/
public S3ACachingBlockManager(
ExecutorServiceFuturePool futurePool,
S3ARemoteObjectReader reader,
BlockData blockData,
int bufferPoolSize,
S3AInputStreamStatistics streamStatistics,
Configuration conf,
LocalDirAllocator localDirAllocator) {
@Nonnull final BlockManagerParams blockManagerParams,
final S3ARemoteObjectReader reader) {

super(futurePool,
blockData,
bufferPoolSize,
streamStatistics,
conf,
localDirAllocator,
conf.getInt(PREFETCH_MAX_BLOCKS_COUNT, DEFAULT_PREFETCH_MAX_BLOCKS_COUNT),
streamStatistics);
super(blockManagerParams);

Validate.checkNotNull(reader, "reader");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,24 @@

import java.io.IOException;

import javax.annotation.Nonnull;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.impl.prefetch.BlockData;
import org.apache.hadoop.fs.impl.prefetch.BlockManager;
import org.apache.hadoop.fs.impl.prefetch.BlockManagerParams;
import org.apache.hadoop.fs.impl.prefetch.BufferData;
import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool;
import org.apache.hadoop.fs.impl.prefetch.FilePosition;
import org.apache.hadoop.fs.s3a.S3AInputStream;
import org.apache.hadoop.fs.s3a.S3AReadOpContext;
import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;

import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PREFETCH_MAX_BLOCKS_COUNT;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_MAX_BLOCKS_COUNT;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCK_ACQUIRE_AND_READ;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration;

Expand Down Expand Up @@ -80,13 +83,19 @@ public S3ACachingInputStream(

this.numBlocksToPrefetch = this.getContext().getPrefetchBlockCount();
int bufferPoolSize = this.numBlocksToPrefetch + 1;
this.blockManager = this.createBlockManager(
this.getContext().getFuturePool(),
this.getReader(),
this.getBlockData(),
bufferPoolSize,
conf,
localDirAllocator);
BlockManagerParams.BlockManagerParamsBuilder blockManagerParamsBuilder =
new BlockManagerParams.BlockManagerParamsBuilder();
blockManagerParamsBuilder.setFuturePool(this.getContext().getFuturePool());
blockManagerParamsBuilder.setBlockData(this.getBlockData());
blockManagerParamsBuilder.setBufferPoolSize(bufferPoolSize);
blockManagerParamsBuilder.setConf(conf);
blockManagerParamsBuilder.setLocalDirAllocator(localDirAllocator);
blockManagerParamsBuilder.setMaxBlocksCount(
conf.getInt(PREFETCH_MAX_BLOCKS_COUNT, DEFAULT_PREFETCH_MAX_BLOCKS_COUNT));
blockManagerParamsBuilder.setPrefetchingStatistics(getS3AStreamStatistics());
blockManagerParamsBuilder.setTrackerFactory(getS3AStreamStatistics());
this.blockManager = this.createBlockManager(blockManagerParamsBuilder.build(),
this.getReader());
int fileSize = (int) s3Attributes.getLen();
LOG.debug("Created caching input stream for {} (size = {})", this.getName(),
fileSize);
Expand Down Expand Up @@ -180,18 +189,8 @@ public String toString() {
}

protected BlockManager createBlockManager(
ExecutorServiceFuturePool futurePool,
S3ARemoteObjectReader reader,
BlockData blockData,
int bufferPoolSize,
Configuration conf,
LocalDirAllocator localDirAllocator) {
return new S3ACachingBlockManager(futurePool,
reader,
blockData,
bufferPoolSize,
getS3AStreamStatistics(),
conf,
localDirAllocator);
@Nonnull final BlockManagerParams blockManagerParams,
final S3ARemoteObjectReader reader) {
return new S3ACachingBlockManager(blockManagerParams, reader);
}
}
Loading

0 comments on commit 633cbd4

Please sign in to comment.