Skip to content

Commit

Permalink
ODP-2645: TEZ-4460: Read timed out in shuffle handler - incorrect usa…
Browse files Browse the repository at this point in the history
…ge of EMPTY_LAST_CONTENT and channel write (apache#257) (Laszlo Bodor reviewed by Rajesh Balamohan, Syed Shameerur Rahman) (#22)
  • Loading branch information
prabhjyotsingh authored Nov 20, 2024
1 parent 0f02614 commit ab1cb2e
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpObjectAggregator;
Expand Down Expand Up @@ -304,21 +303,28 @@ public ReduceMapFileCount(ReduceContext rc) {

@Override
public void operationComplete(ChannelFuture future) throws Exception {
Channel ch = future.channel();
if (!future.isSuccess()) {
future.channel().close();
ch.close();
return;
}
int waitCount = this.reduceContext.getMapsToWait().decrementAndGet();
if (waitCount == 0) {
LOG.debug("Finished with all map outputs");
/*
* LastHttpContent.EMPTY_LAST_CONTENT can only be written when there are no remaining maps to send,
* this is the only time we can finish the HTTP response.
*/
ch.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
metrics.operationComplete(future);
// Let the idle timer handler close keep-alive connections
if (reduceContext.getKeepAlive()) {
ChannelPipeline pipeline = future.channel().pipeline();
ChannelPipeline pipeline = ch.pipeline();
TimeoutHandler timeoutHandler =
(TimeoutHandler) pipeline.get(TIMEOUT_HANDLER);
timeoutHandler.setEnabledTimeout(true);
} else {
future.channel().close();
ch.close();
}
} else {
SHUFFLE.sendMap(reduceContext);
Expand Down Expand Up @@ -987,12 +993,11 @@ public void channelActive(ChannelHandlerContext ctx)
@Override
public void channelRead(ChannelHandlerContext ctx, Object message)
throws Exception {
FullHttpRequest request = (FullHttpRequest) message;
HttpRequest request = (HttpRequest) message;
handleRequest(ctx, request);
request.release();
}

private void handleRequest(ChannelHandlerContext ctx, FullHttpRequest request)
private void handleRequest(ChannelHandlerContext ctx, HttpRequest request)
throws IOException, Exception {
if (request.getMethod() != GET) {
sendError(ctx, METHOD_NOT_ALLOWED);
Expand Down Expand Up @@ -1108,13 +1113,9 @@ private void handleRequest(ChannelHandlerContext ctx, FullHttpRequest request)
for (int i = 0; i < Math.min(maxSessionOpenFiles, mapIds.size()); i++) {
ChannelFuture nextMap = sendMap(reduceContext);
if(nextMap == null) {
// by this special message flushed, we can make sure the whole response is finished
ch.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
return;
}
}
// by this special message flushed, we can make sure the whole response is finished
ch.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
}

private boolean deleteDagDirectories(Channel channel,
Expand Down Expand Up @@ -1395,7 +1396,7 @@ protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, Channel ch,
DataOutputBuffer dobRange = new DataOutputBuffer();
// Indicate how many record to be written
WritableUtils.writeVInt(dobRange, reduceRange.getLast() - reduceRange.getFirst() + 1);
ch.write(wrappedBuffer(dobRange.getData(), 0, dobRange.getLength()));
ch.writeAndFlush(wrappedBuffer(dobRange.getData(), 0, dobRange.getLength()));
for (int reduce = reduceRange.getFirst(); reduce <= reduceRange.getLast(); reduce++) {
TezIndexRecord index = outputInfo.getIndex(reduce);
// Records are only valid if they have a non-zero part length
Expand All @@ -1410,7 +1411,7 @@ protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, Channel ch,
DataOutputBuffer dob = new DataOutputBuffer();
header.write(dob);
// Free the memory needed to store the spill and index records
ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength()));
}
outputInfo.finish();

Expand All @@ -1430,14 +1431,14 @@ protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, Channel ch,
rangeOffset, rangePartLength, manageOsCache, readaheadLength,
readaheadPool, spillFile.getAbsolutePath(),
shuffleBufferSize, shuffleTransferToAllowed);
writeFuture = ch.write(partition);
writeFuture = ch.writeAndFlush(partition);
} else {
// HTTPS cannot be done with zero copy.
final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill,
rangeOffset, rangePartLength, sslFileBufferSize,
manageOsCache, readaheadLength, readaheadPool,
spillFile.getAbsolutePath());
writeFuture = ch.write(chunk);
writeFuture = ch.writeAndFlush(chunk);
}
metrics.shuffleConnections.incr();
metrics.shuffleOutputBytes.incr(rangePartLength); // optimistic
Expand Down
Loading

0 comments on commit ab1cb2e

Please sign in to comment.