From 6cbe0f404c4e2cf438eddb9f656ece80e5a56659 Mon Sep 17 00:00:00 2001 From: Jiajun Mao Date: Mon, 2 Sep 2024 17:42:48 -0500 Subject: [PATCH] include colidx in DFS packet packet header --- .../org/apache/hadoop/hdfs/DFSPacket.java | 36 ++++++++++++++- .../protocol/datatransfer/PacketHeader.java | 38 ++++++++++++++++ .../src/main/proto/datatransfer.proto | 1 + .../hdfs/server/datanode/BlockReceiver.java | 5 ++- .../erasurecode/StripedBlockWriter.java | 44 +++++++++++++++++++ .../datanode/erasurecode/StripedWriter.java | 19 ++++++++ 6 files changed, 140 insertions(+), 3 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java index d3b316cedee73..c1542bdce1b47 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java @@ -74,6 +74,9 @@ public class DFSPacket { private int traceParentsUsed; private Span span; + // MLEC + private Integer colIdx; + /** * Create a new packet. * @@ -100,6 +103,33 @@ public DFSPacket(byte[] buf, int chunksPerPkt, long offsetInBlock, long seqno, maxChunks = chunksPerPkt; } + /** + * Create a new packet. + * + * @param buf the buffer storing data and checksums + * @param chunksPerPkt maximum number of chunks per packet. + * @param offsetInBlock offset in bytes into the HDFS block. + * @param seqno the sequence number of this packet + * @param checksumSize the size of checksum + * @param lastPacketInBlock if this is the last packet + */ + public DFSPacket(byte[] buf, int chunksPerPkt, long offsetInBlock, long seqno, + int checksumSize, boolean lastPacketInBlock, Integer colIdx) { + this.lastPacketInBlock = lastPacketInBlock; + this.numChunks = 0; + this.offsetInBlock = offsetInBlock; + this.seqno = seqno; + + this.buf = buf; + this.colIdx = colIdx; + + checksumStart = PacketHeader.PKT_MAX_HEADER_LEN; + checksumPos = checksumStart; + dataStart = checksumStart + (chunksPerPkt * checksumSize); + dataPos = dataStart; + maxChunks = chunksPerPkt; + } + /** * Write data to this packet. * @@ -165,7 +195,7 @@ public synchronized void writeTo(DataOutputStream stm) throws IOException { final int pktLen = HdfsConstants.BYTES_IN_INTEGER + dataLen + checksumLen; PacketHeader header = new PacketHeader( - pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen, syncBlock); + pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen, syncBlock, colIdx); if (checksumPos != dataStart) { // Move the checksum to cover the gap. This can happen for the last @@ -364,4 +394,8 @@ public void setSpan(Span span) { public Span getSpan() { return span; } + + private int getColIdx() { + return this.colIdx; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java index ef1a3658305b1..e75d2920db68b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java @@ -69,6 +69,40 @@ public class PacketHeader { public PacketHeader() { } + public PacketHeader(int packetLen, long offsetInBlock, long seqno, + boolean lastPacketInBlock, int dataLen, boolean syncBlock, Integer colIdx) { + this.packetLen = packetLen; + Preconditions.checkArgument(packetLen >= Ints.BYTES, + "packet len %s should always be at least 4 bytes", + packetLen); + + PacketHeaderProto.Builder builder; + if (colIdx == null) { + builder = PacketHeaderProto.newBuilder() + .setOffsetInBlock(offsetInBlock) + .setSeqno(seqno) + .setLastPacketInBlock(lastPacketInBlock) + .setDataLen(dataLen); + } else { + builder = PacketHeaderProto.newBuilder() + .setOffsetInBlock(offsetInBlock) + .setSeqno(seqno) + .setLastPacketInBlock(lastPacketInBlock) + .setDataLen(dataLen) + .setColIdx(colIdx); + } + + if (syncBlock) { + // Only set syncBlock if it is specified. + // This is wire-incompatible with Hadoop 2.0.0-alpha due to HDFS-3721 + // because it changes the length of the packet header, and BlockReceiver + // in that version did not support variable-length headers. + builder.setSyncBlock(true); + } + + proto = builder.build(); + } + public PacketHeader(int packetLen, long offsetInBlock, long seqno, boolean lastPacketInBlock, int dataLen, boolean syncBlock) { this.packetLen = packetLen; @@ -117,6 +151,10 @@ public boolean getSyncBlock() { return proto.getSyncBlock(); } + public Integer getColIdx() { + return proto.getColIdx(); + } + @Override public String toString() { return "PacketHeader with packetLen=" + packetLen + diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto index a805729d90504..25374e1800675 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto @@ -241,6 +241,7 @@ message PacketHeaderProto { required bool lastPacketInBlock = 3; required sfixed32 dataLen = 4; optional bool syncBlock = 5 [default = false]; + optional int32 colIdx = 6; } // Status is a 4-bit enum diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 18c77784a34c2..a27dddac4978a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -717,7 +717,7 @@ private int receivePacket() throws IOException { List dnodes = zfsTools.getFailedChunks("pool"); DnodeAttributes reconDnode = null; for (DnodeAttributes dn : dnodes) { - if (dn.path.contains(String.valueOf(block.getBlockId()))) { + if (dn.path.contains(String.valueOf(block.getBlockId())) && !dn.path.contains("meta")) { reconDnode = dn; } } @@ -728,8 +728,9 @@ private int receivePacket() throws IOException { throw new IllegalStateException("Cannot find reconstruction dnode for block " + block.getBlockId()); } + LOG.info("Writing to dnode at colIdx {}", header.getColIdx()); // TODO: pass in the column index information into the DFS packet - new Tools().writeRepairData("pool", reconDnode, 0, 0, dataBuf.array()); + new Tools().writeRepairData("pool", reconDnode, 0, header.getColIdx(), dataBuf.array()); } if (onDiskLen 0) { + DFSPacket packet = new DFSPacket(packetBuf, + stripedWriter.getMaxChunksPerPacket(), + blockOffset4Target, seqNo4Target++, + stripedWriter.getChecksumSize(), false, colIdx); + int maxBytesToPacket = stripedWriter.getMaxChunksPerPacket() + * stripedWriter.getBytesPerChecksum(); + int toWrite = targetBuffer.remaining() > maxBytesToPacket ? + maxBytesToPacket : targetBuffer.remaining(); + int ckLen = ((toWrite - 1) / stripedWriter.getBytesPerChecksum() + 1) + * stripedWriter.getChecksumSize(); + packet.writeChecksum(stripedWriter.getChecksumBuf(), ckOff, ckLen); + ckOff += ckLen; + packet.writeData(targetBuffer, toWrite); + + // Send packet + packet.writeTo(targetOutputStream); + + blockOffset4Target += toWrite; + stripedWriter.getReconstructor().incrBytesWritten(toWrite); + } + } + // send an empty packet to mark the end of the block void endTargetBlock(byte[] packetBuf) throws IOException { DFSPacket packet = new DFSPacket(packetBuf, 0, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java index d71a34de30823..1ec74f85193ae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java @@ -165,6 +165,25 @@ int transferData2Targets() { return nSuccess; } + // MLEC override + int transferData2Targets(int columnIdx) { + int nSuccess = 0; + for (int i = 0; i < targets.length; i++) { + if (targetsStatus[i]) { + boolean success = false; + try { + writers[i].transferData2Target(packetBuf, columnIdx); + nSuccess++; + success = true; + } catch (IOException e) { + LOG.error("Error while transferring data to target", e); + } + targetsStatus[i] = success; + } + } + return nSuccess; + } + /** * Send an empty packet to mark the end of the block. */