From 5c69f3f03175f3eeb5e2cd95c0a96ef74caa3b1a Mon Sep 17 00:00:00 2001 From: "jinbiao.sun" Date: Thu, 28 Sep 2023 16:24:15 +0800 Subject: [PATCH 1/2] HDFS-16968. Recover two replicas when 2-replication write pipepine fails --- .../ReplaceDatanodeOnFailure.java | 2 +- .../hdfs/TestReplaceDatanodeOnFailure.java | 56 ++++++++++++++++++- 2 files changed, 55 insertions(+), 3 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ReplaceDatanodeOnFailure.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ReplaceDatanodeOnFailure.java index 7099c28db1825..799ef98696227 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ReplaceDatanodeOnFailure.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ReplaceDatanodeOnFailure.java @@ -42,7 +42,7 @@ public class ReplaceDatanodeOnFailure { public boolean satisfy(final short replication, final DatanodeInfo[] existings, final int n, final boolean isAppend, final boolean isHflushed) { - return replication >= 3 && + return replication >= 2 && (n <= (replication / 2) || isAppend || isHflushed); } }; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplaceDatanodeOnFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplaceDatanodeOnFailure.java index 5015722c61f7d..a2b451f04183b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplaceDatanodeOnFailure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplaceDatanodeOnFailure.java @@ -86,12 +86,12 @@ public void testDefaultPolicy() throws Exception { final int half = replication/2; final boolean enoughReplica = replication <= nExistings; final boolean noReplica = nExistings == 0; - final boolean replicationL3 = replication < 3; + final boolean replicationL2 = replication < 2; final boolean existingsLEhalf = nExistings <= half; final boolean isAH = isAppend[i] || isHflushed[j]; final boolean expected; - if (enoughReplica || noReplica || replicationL3) { + if (enoughReplica || noReplica || replicationL2) { expected = false; } else { expected = isAH || existingsLEhalf; @@ -114,6 +114,50 @@ public void testDefaultPolicy() throws Exception { } } + /** Test replace datanode on failure with 2-replication file. */ + @Test + public void testReplaceDatanodeOnFailureWith2Replications() throws Exception { + final Configuration conf = new HdfsConfiguration(); + // do not consider load factor when selecting a data node + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY, + false); + //set policy to DEFAULT + ReplaceDatanodeOnFailure.write(Policy.DEFAULT, false, conf); + + final int repNum = 2; + final String[] racks = new String[repNum]; + Arrays.fill(racks, RACK0); + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf + ).racks(racks).numDataNodes(repNum).build(); + + try { + cluster.waitActive(); + final DistributedFileSystem fs = cluster.getFileSystem(); + final Path dir = new Path(DIR); + final SlowWriter[] slowwriter = new SlowWriter[1]; + slowwriter[0] = new SlowWriter(fs, new Path(dir, "file-rep2"), 200L, (short) 2); + slowwriter[0].start(); + + //start new datanodes + cluster.startDataNodes(conf, 1, true, null, new String[]{RACK1}); + cluster.waitActive(); + // wait for first block reports for up to 10 seconds + cluster.waitFirstBRCompleted(0, 10000); + + //stop an old datanode + MiniDFSCluster.DataNodeProperties dnprop = cluster.stopDataNode( + AppendTestUtil.nextInt(repNum)); + + sleepSeconds(3); + Assert.assertEquals(repNum, slowwriter[0].out.getCurrentBlockReplication()); + + slowwriter[0].interruptRunning(); + slowwriter[0].joinAndClose(); + } finally { + if (cluster != null) {cluster.shutdown();} + } + } + /** Test replace datanode on failure. */ @Test public void testReplaceDatanodeOnFailure() throws Exception { @@ -236,6 +280,14 @@ static class SlowWriter extends Thread { this.sleepms = sleepms; } + SlowWriter(DistributedFileSystem fs, Path filepath, final long sleepms, + short replication) throws IOException { + super(SlowWriter.class.getSimpleName() + ":" + filepath); + this.filepath = filepath; + this.out = (HdfsDataOutputStream)fs.create(filepath, replication); + this.sleepms = sleepms; + } + @Override public void run() { int i = 0; From 412b38a290d68e9af00d87473fc8b863be1debaa Mon Sep 17 00:00:00 2001 From: "jinbiao.sun" Date: Thu, 28 Sep 2023 16:34:10 +0800 Subject: [PATCH 2/2] HDFS-16968. Modify the corresponding note --- .../hdfs/protocol/datatransfer/ReplaceDatanodeOnFailure.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ReplaceDatanodeOnFailure.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ReplaceDatanodeOnFailure.java index 799ef98696227..2e9256e3bf7cf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ReplaceDatanodeOnFailure.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ReplaceDatanodeOnFailure.java @@ -34,7 +34,7 @@ public class ReplaceDatanodeOnFailure { * DEFAULT condition: * Let r be the replication number. * Let n be the number of existing datanodes. - * Add a new datanode only if r >= 3 and either + * Add a new datanode only if r >= 2 and either * (1) floor(r/2) >= n or (2) the block is hflushed/appended. */ private static final Condition CONDITION_DEFAULT = new Condition() {