Skip to content

Commit

Permalink
add Configuration dfs.heartbeat.reregister,interval
Browse files Browse the repository at this point in the history
  • Loading branch information
lgh committed Oct 31, 2023
1 parent cf75d85 commit d17310f
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -972,6 +972,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final boolean DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT = true;
public static final String DFS_HEARTBEAT_INTERVAL_KEY = "dfs.heartbeat.interval";
public static final long DFS_HEARTBEAT_INTERVAL_DEFAULT = 3;
public static final String DFS_HEARTBEAT_REREGISTER_INTERVAL_KEY = "dfs.heartbeat.reregister,interval";
public static final long DFS_HEARTBEAT_REREGISTER_INTERVAL__DEFAULT = 9;
public static final String DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY =
"dfs.datanode.lifeline.interval.seconds";
public static final String DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS = "dfs.namenode.path.based.cache.retry.interval.ms";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ enum RunningState {
fullBlockReportLeaseId = 0;
scheduler = new Scheduler(dnConf.heartBeatInterval,
dnConf.getLifelineIntervalMs(), dnConf.blockReportInterval,
dnConf.outliersReportIntervalMs);
dnConf.outliersReportIntervalMs, dnConf.heartBeatReRegisterInterval);
// get the value of maxDataLength.
this.maxDataLength = dnConf.getMaxDataLength();
if (serviceId != null) {
Expand Down Expand Up @@ -1263,14 +1263,16 @@ static class Scheduler {
private final long lifelineIntervalMs;
private volatile long blockReportIntervalMs;
private volatile long outliersReportIntervalMs;
private final long heartBeatReRegisterIntervalMs;
private long reRegisterTime = 0;

Scheduler(long heartbeatIntervalMs, long lifelineIntervalMs,
long blockReportIntervalMs, long outliersReportIntervalMs) {
long blockReportIntervalMs, long outliersReportIntervalMs, long heartBeatReRegisterIntervalMs ) {
this.heartbeatIntervalMs = heartbeatIntervalMs;
this.lifelineIntervalMs = lifelineIntervalMs;
this.blockReportIntervalMs = blockReportIntervalMs;
this.outliersReportIntervalMs = outliersReportIntervalMs;
this.heartBeatReRegisterIntervalMs = heartBeatReRegisterIntervalMs;
scheduleNextLifeline(nextHeartbeatTime);
}

Expand Down Expand Up @@ -1450,7 +1452,7 @@ long getOutliersReportIntervalMs() {
}

private boolean shouldReRegister() {
return monotonicNow() - reRegisterTime > this.heartbeatIntervalMs * 3;
return monotonicNow() - reRegisterTime > this.heartBeatReRegisterIntervalMs;
}

public void setReRegisterTime(long reRegisterTime) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PROCESS_COMMANDS_THRESHOLD_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_REREGISTER_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_REREGISTER_INTERVAL__DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
Expand Down Expand Up @@ -105,6 +107,7 @@ public class DNConf {

final long readaheadLength;
final long heartBeatInterval;
final long heartBeatReRegisterInterval;
private final long lifelineIntervalMs;
volatile long blockReportInterval;
volatile long blockReportSplitThreshold;
Expand Down Expand Up @@ -219,6 +222,9 @@ public DNConf(final Configurable dn) {
heartBeatInterval = getConf().getTimeDuration(DFS_HEARTBEAT_INTERVAL_KEY,
DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS,
TimeUnit.MILLISECONDS);
heartBeatReRegisterInterval = getConf().getTimeDuration(DFS_HEARTBEAT_REREGISTER_INTERVAL_KEY,
DFS_HEARTBEAT_REREGISTER_INTERVAL__DEFAULT, TimeUnit.SECONDS,
TimeUnit.MILLISECONDS);
long confLifelineIntervalMs =
getConf().getLong(DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY,
3 * getConf().getTimeDuration(DFS_HEARTBEAT_INTERVAL_KEY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_REREGISTER_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC;
import static org.apache.hadoop.test.PlatformAssumptions.assumeNotWindows;
Expand Down Expand Up @@ -561,6 +562,7 @@ public void testBalancerWithSortTopNodes() throws Exception {
final Configuration conf = new HdfsConfiguration();
initConf(conf);
conf.setInt(DFS_HEARTBEAT_INTERVAL_KEY, 30000);
conf.setLong(DFS_HEARTBEAT_REREGISTER_INTERVAL_KEY, 0);

final long capacity = 1000L;
final int diffBetweenNodes = 50;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,16 @@

package org.apache.hadoop.hdfs.server.datanode;

import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import org.apache.hadoop.hdfs.server.datanode.BPServiceActor.Scheduler;
import org.apache.hadoop.util.Time;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;

import java.util.Arrays;
import java.util.List;
import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static java.lang.Math.abs;
import static org.hamcrest.core.Is.is;
Expand All @@ -52,6 +51,7 @@ public class TestBpServiceActorScheduler {
public Timeout timeout = new Timeout(300000);

private static final long HEARTBEAT_INTERVAL_MS = 5000; // 5 seconds
private static final long HEARTBEAT_REREGISTER_INTERVAL_MS = 5000*3; // 5 seconds
private static final long LIFELINE_INTERVAL_MS = 3 * HEARTBEAT_INTERVAL_MS;
private static final long BLOCK_REPORT_INTERVAL_MS = 10000; // 10 seconds
private static final long OUTLIER_REPORT_INTERVAL_MS = 10000; // 10 seconds
Expand Down Expand Up @@ -209,7 +209,8 @@ public void testScheduleLifeline() {
public void testScheduleLifelineScheduleTime() {
Scheduler mockScheduler = spy(new Scheduler(
HEARTBEAT_INTERVAL_MS, LIFELINE_INTERVAL_MS,
BLOCK_REPORT_INTERVAL_MS, OUTLIER_REPORT_INTERVAL_MS));
BLOCK_REPORT_INTERVAL_MS, OUTLIER_REPORT_INTERVAL_MS,
HEARTBEAT_REREGISTER_INTERVAL_MS));
long now = Time.monotonicNow();
mockScheduler.scheduleNextLifeline(now);
long mockMonotonicNow = now + LIFELINE_INTERVAL_MS * 2;
Expand All @@ -234,7 +235,8 @@ private Scheduler makeMockScheduler(long now) {
LOG.info("Using now = " + now);
Scheduler mockScheduler = spy(new Scheduler(
HEARTBEAT_INTERVAL_MS, LIFELINE_INTERVAL_MS,
BLOCK_REPORT_INTERVAL_MS, OUTLIER_REPORT_INTERVAL_MS));
BLOCK_REPORT_INTERVAL_MS, OUTLIER_REPORT_INTERVAL_MS,
HEARTBEAT_REREGISTER_INTERVAL_MS));
doReturn(now).when(mockScheduler).monotonicNow();
mockScheduler.setNextBlockReportTime(now);
mockScheduler.nextHeartbeatTime = now;
Expand Down

0 comments on commit d17310f

Please sign in to comment.