Skip to content

Commit

Permalink
HDFS-17080. fix ec connection leak. (#5807)
Browse files Browse the repository at this point in the history
  • Loading branch information
harris233 authored Jan 6, 2025
1 parent f65747d commit 815ca41
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -253,13 +253,19 @@ private int readToBuffer(BlockReader blockReader,
strategy.getReadBuffer().clear();
// we want to remember which block replicas we have tried
corruptedBlocks.addCorruptedBlock(currentBlock, currentNode);
if (blockReader != null) {
blockReader.close();
}
throw ce;
} catch (IOException e) {
DFSClient.LOG.warn("Exception while reading from "
+ currentBlock + " of " + dfsStripedInputStream.getSrc() + " from "
+ currentNode, e);
//Clear buffer to make next decode success
strategy.getReadBuffer().clear();
if (blockReader != null) {
blockReader.close();
}
throw e;
}
}
Expand Down Expand Up @@ -329,21 +335,26 @@ boolean readChunk(final LocatedBlock block, int chunkIndex)
* read the whole stripe. do decoding if necessary
*/
void readStripe() throws IOException {
for (int i = 0; i < dataBlkNum; i++) {
if (alignedStripe.chunks[i] != null &&
alignedStripe.chunks[i].state != StripingChunk.ALLZERO) {
if (!readChunk(targetBlocks[i], i)) {
alignedStripe.missingChunksNum++;
try {
for (int i = 0; i < dataBlkNum; i++) {
if (alignedStripe.chunks[i] != null &&
alignedStripe.chunks[i].state != StripingChunk.ALLZERO) {
if (!readChunk(targetBlocks[i], i)) {
alignedStripe.missingChunksNum++;
}
}
}
}
// There are missing block locations at this stage. Thus we need to read
// the full stripe and one more parity block.
if (alignedStripe.missingChunksNum > 0) {
checkMissingBlocks();
readDataForDecoding();
// read parity chunks
readParityChunks(alignedStripe.missingChunksNum);
// There are missing block locations at this stage. Thus we need to read
// the full stripe and one more parity block.
if (alignedStripe.missingChunksNum > 0) {
checkMissingBlocks();
readDataForDecoding();
// read parity chunks
readParityChunks(alignedStripe.missingChunksNum);
}
} catch (IOException e) {
dfsStripedInputStream.close();
throw e;
}
// TODO: for a full stripe we can start reading (dataBlkNum + 1) chunks

Expand Down Expand Up @@ -385,7 +396,8 @@ void readStripe() throws IOException {
}
} catch (InterruptedException ie) {
String err = "Read request interrupted";
DFSClient.LOG.error(err);
DFSClient.LOG.error(err, ie);
dfsStripedInputStream.close();
clearFutures();
// Don't decode if read interrupted
throw new InterruptedIOException(err);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,12 @@ public LineRecordReader(Configuration job, FileSplit split,
// because we always (except the last split) read one extra line in
// next() method.
if (start != 0) {
start += in.readLine(new Text(), 0, maxBytesToConsume(start));
try {
start += in.readLine(new Text(), 0, maxBytesToConsume(start));
} catch (Exception e) {
close();
throw e;
}
}
this.pos = start;
}
Expand Down

0 comments on commit 815ca41

Please sign in to comment.