Skip to content

Commit

Permalink
[TEZ-4019] Modify Tez shuffle handler to use AuxiliaryLocalPathHandle…
Browse files Browse the repository at this point in the history
…r instead of LocalDirAllocator
  • Loading branch information
Jonathan Eagles committed May 13, 2024
1 parent 7a9211e commit 199958f
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
Expand Down Expand Up @@ -903,8 +902,6 @@ class Shuffle extends ChannelInboundHandlerAdapter {
private static final int ALLOWED_CONCURRENCY = 16;
private final Configuration conf;
private final IndexCache indexCache;
private final LocalDirAllocator lDirAlloc =
new LocalDirAllocator(YarnConfiguration.NM_LOCAL_DIRS);
private int port;
private final LoadingCache<AttemptPathIdentifier, AttemptPathInfo> pathCache =
CacheBuilder.newBuilder().expireAfterAccess(EXPIRE_AFTER_ACCESS_MINUTES,
Expand Down Expand Up @@ -937,10 +934,10 @@ public AttemptPathInfo load(AttemptPathIdentifier key) throws
Exception {
String base = getBaseLocation(key.jobId, key.dagId, key.user);
String attemptBase = base + key.attemptId;
Path indexFileName = lDirAlloc.getLocalPathToRead(
attemptBase + Path.SEPARATOR + INDEX_FILE_NAME, conf);
Path mapOutputFileName = lDirAlloc.getLocalPathToRead(
attemptBase + Path.SEPARATOR + DATA_FILE_NAME, conf);
Path indexFileName = getAuxiliaryLocalPathHandler()
.getLocalPathForRead(attemptBase + "/" + INDEX_FILE_NAME);
Path mapOutputFileName = getAuxiliaryLocalPathHandler()
.getLocalPathForRead(attemptBase + "/" + DATA_FILE_NAME);

LOG.debug("Loaded : {} via loader", key);
return new AttemptPathInfo(indexFileName, mapOutputFileName);
Expand Down Expand Up @@ -1154,7 +1151,7 @@ private boolean deleteDagDirectories(Channel channel,
String base = getDagLocation(jobQ.get(0), dagIdQ.get(0), userRsrc.get(jobQ.get(0)));
try {
FileContext lfc = FileContext.getLocalFSFileContext();
for(Path dagPath : lDirAlloc.getAllLocalPathsToRead(base, conf)) {
for(Path dagPath : getAuxiliaryLocalPathHandler().getAllLocalPathsForRead(base)) {
lfc.delete(dagPath, true);
}
} catch (IOException e) {
Expand Down Expand Up @@ -1196,7 +1193,7 @@ private boolean deleteTaskAttemptDirectories(Channel channel, List<String> taskA
String baseStr = getBaseLocation(jobQ.get(0), dagIdQ.get(0), userRsrc.get(jobQ.get(0)));
try {
FileSystem fs = FileSystem.getLocal(conf).getRaw();
for (Path basePath : lDirAlloc.getAllLocalPathsToRead(baseStr, conf)) {
for (Path basePath : getAuxiliaryLocalPathHandler().getAllLocalPathsForRead(baseStr)) {
for (FileStatus fileStatus : fs.listStatus(basePath)) {
Path taskAttemptPath = fileStatus.getPath();
if (taskAttemptPath.getName().startsWith(taskAttemptId)) {
Expand Down Expand Up @@ -1293,7 +1290,7 @@ private String getBaseLocation(String jobId, String dagId, String user) {
private void deleteTaskDirsOfVertex(String jobId, String dagId, String vertexId, String user) throws IOException {
String baseStr = getBaseLocation(jobId, dagId, user);
FileContext lfc = FileContext.getLocalFSFileContext();
for(Path dagPath : lDirAlloc.getAllLocalPathsToRead(baseStr, conf)) {
for(Path dagPath : getAuxiliaryLocalPathHandler().getAllLocalPathsForRead(baseStr)) {
RemoteIterator<FileStatus> status = lfc.listStatus(dagPath);
final JobID jobID = JobID.forName(jobId);
String taskDirPrefix = String.format("attempt%s_%s_%s_",
Expand Down
Loading

0 comments on commit 199958f

Please sign in to comment.