Skip to content

Commit

Permalink
Add Segment Relocator Rebalance Configs to Controller Conf (apache#13863
Browse files Browse the repository at this point in the history
)

* Do best-efforts rebalance in SegmentRelocator

* Add rebalance configs to controller conf

* Rollback formatting changes

* Remove comment

* Addressed comments

* Checkstyle

* Rollback unrelated formatting changes
  • Loading branch information
suddendust authored Sep 9, 2024
1 parent a9e4e52 commit d73aa1e
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,17 @@ public static class ControllerPeriodicTasksConf {
public static final String DEPRECATED_SEGMENT_RELOCATOR_FREQUENCY_IN_SECONDS =
"controller.segment.relocator.frequencyInSeconds";
public static final String SEGMENT_RELOCATOR_FREQUENCY_PERIOD = "controller.segment.relocator.frequencyPeriod";

//whether segment relocator should do a best-efforts rebalance. Default is 'true'
public static final String SEGMENT_RELOCATOR_BEST_EFFORTS = "controller.segment.relocator.bestEfforts";
//For no-downtime rebalance, minimum number of replicas to keep alive during rebalance, or maximum number of
// replicas allowed to be unavailable if value is negative. Default value is -1
public static final String SEGMENT_RELOCATOR_MIN_AVAIL_REPLICAS =
"controller.segment.relocator.minAvailableReplicas";
public static final String SEGMENT_RELOCATOR_REASSIGN_INSTANCES = "controller.segment.relocator.reassignInstances";
public static final String SEGMENT_RELOCATOR_BOOTSTRAP_SERVERS = "controller.segment.relocator.bootstrap";
public static final String SEGMENT_RELOCATOR_DOWNTIME = "controller.segment.relocator.downtime";

public static final String REBALANCE_CHECKER_FREQUENCY_PERIOD = "controller.rebalance.checker.frequencyPeriod";
// Because segment level validation is expensive and requires heavy ZK access, we run segment level validation
// with a separate interval
Expand Down Expand Up @@ -689,6 +700,31 @@ public int getSegmentRelocatorFrequencyInSeconds() {
});
}

public boolean getSegmentRelocatorRebalanceConfigBestEfforts() {
return Optional.ofNullable(getProperty(ControllerPeriodicTasksConf.SEGMENT_RELOCATOR_BEST_EFFORTS))
.map(Boolean::parseBoolean).orElse(true);
}

public int getSegmentRelocatorRebalanceConfigMinAvailReplicas() {
return Optional.ofNullable(getProperty(ControllerPeriodicTasksConf.SEGMENT_RELOCATOR_MIN_AVAIL_REPLICAS))
.map(Integer::parseInt).orElse(-1);
}

public boolean getSegmentRelocatorRebalanceConfigReassignInstances() {
return Optional.ofNullable(getProperty(ControllerPeriodicTasksConf.SEGMENT_RELOCATOR_REASSIGN_INSTANCES))
.map(Boolean::parseBoolean).orElse(false);
}

public boolean getSegmentRelocatorRebalanceConfigBootstrapServers() {
return Optional.ofNullable(getProperty(ControllerPeriodicTasksConf.SEGMENT_RELOCATOR_BOOTSTRAP_SERVERS))
.map(Boolean::parseBoolean).orElse(false);
}

public boolean getSegmentRelocatorRebalanceConfigDowntime() {
return Optional.ofNullable(getProperty(ControllerPeriodicTasksConf.SEGMENT_RELOCATOR_DOWNTIME))
.map(Boolean::parseBoolean).orElse(false);
}

public void setSegmentRelocatorFrequencyInSeconds(int segmentRelocatorFrequencyInSeconds) {
setProperty(ControllerPeriodicTasksConf.DEPRECATED_SEGMENT_RELOCATOR_FREQUENCY_IN_SECONDS,
Integer.toString(segmentRelocatorFrequencyInSeconds));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ public class SegmentRelocator extends ControllerPeriodicTask<Void> {
private final boolean _enableLocalTierMigration;
private final int _serverAdminRequestTimeoutMs;
private final long _externalViewCheckIntervalInMs;

private final boolean _bestEffortsRebalance;
private final int _minAvailReplicasDuringRebalance;
private final boolean _reassignInstancesDuringRebalance;
private final boolean _bootstrapServersDuringRebalance;
private final boolean _downtime;

private final long _externalViewStabilizationTimeoutInMs;
private final Set<String> _waitingTables;
private final BlockingQueue<String> _waitingQueue;
Expand All @@ -86,6 +93,11 @@ public SegmentRelocator(PinotHelixResourceManager pinotHelixResourceManager,
Math.min(taskIntervalInMs, config.getSegmentRelocatorExternalViewCheckIntervalInMs());
_externalViewStabilizationTimeoutInMs =
Math.min(taskIntervalInMs, config.getSegmentRelocatorExternalViewStabilizationTimeoutInMs());
_bestEffortsRebalance = config.getSegmentRelocatorRebalanceConfigBestEfforts();
_minAvailReplicasDuringRebalance = config.getSegmentRelocatorRebalanceConfigMinAvailReplicas();
_reassignInstancesDuringRebalance = config.getSegmentRelocatorRebalanceConfigReassignInstances();
_bootstrapServersDuringRebalance = config.getSegmentRelocatorRebalanceConfigBootstrapServers();
_downtime = config.getSegmentRelocatorRebalanceConfigDowntime();
if (config.isSegmentRelocatorRebalanceTablesSequentially()) {
_waitingTables = ConcurrentHashMap.newKeySet();
_waitingQueue = new LinkedBlockingQueue<>();
Expand Down Expand Up @@ -169,6 +181,12 @@ private void rebalanceTable(String tableNameWithType) {
rebalanceConfig.setExternalViewCheckIntervalInMs(_externalViewCheckIntervalInMs);
rebalanceConfig.setExternalViewStabilizationTimeoutInMs(_externalViewStabilizationTimeoutInMs);
rebalanceConfig.setUpdateTargetTier(TierConfigUtils.shouldRelocateToTiers(tableConfig));
//Do not fail the rebalance when the no-downtime contract cannot be achieved
rebalanceConfig.setBestEfforts(_bestEffortsRebalance);
rebalanceConfig.setReassignInstances(_reassignInstancesDuringRebalance);
rebalanceConfig.setBootstrap(_bootstrapServersDuringRebalance);
rebalanceConfig.setMinAvailableReplicas(_minAvailReplicasDuringRebalance);
rebalanceConfig.setDowntime(_downtime);

try {
// Relocating segments to new tiers needs two sequential actions: table rebalance and local tier migration.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,35 @@ public void supplyingOnlyNewConfigsShouldReturnCorrectlyConvertedValue() {
controllerConfig);
}

@Test
public void validateSegmentRelocatorRebalanceDefaultConfigs() {
//setup
Map<String, Object> controllerConfig = new HashMap<>();
ControllerConf conf = new ControllerConf(controllerConfig);
Assert.assertFalse(conf.getSegmentRelocatorRebalanceConfigBootstrapServers());
Assert.assertFalse(conf.getSegmentRelocatorRebalanceConfigReassignInstances());
Assert.assertTrue(conf.getSegmentRelocatorRebalanceConfigBestEfforts());
Assert.assertEquals(-1, conf.getSegmentRelocatorRebalanceConfigMinAvailReplicas());
Assert.assertFalse(conf.getSegmentRelocatorRebalanceConfigDowntime());
}

@Test
public void validateSegmentRelocatorRebalanceConfigs() {
//setup
Map<String, Object> controllerConfig = new HashMap<>();
controllerConfig.put(SEGMENT_RELOCATOR_BEST_EFFORTS, true);
controllerConfig.put(SEGMENT_RELOCATOR_REASSIGN_INSTANCES, true);
controllerConfig.put(SEGMENT_RELOCATOR_MIN_AVAIL_REPLICAS, -2);
controllerConfig.put(SEGMENT_RELOCATOR_BOOTSTRAP_SERVERS, false);
controllerConfig.put(SEGMENT_RELOCATOR_DOWNTIME, true);
ControllerConf conf = new ControllerConf(controllerConfig);
Assert.assertFalse(conf.getSegmentRelocatorRebalanceConfigBootstrapServers());
Assert.assertTrue(conf.getSegmentRelocatorRebalanceConfigReassignInstances());
Assert.assertTrue(conf.getSegmentRelocatorRebalanceConfigBestEfforts());
Assert.assertEquals(-2, conf.getSegmentRelocatorRebalanceConfigMinAvailReplicas());
Assert.assertTrue(conf.getSegmentRelocatorRebalanceConfigDowntime());
}

@Test
public void shouldBeAbleToDisableUsingNewConfig() {
Map<String, Object> controllerConfig = new HashMap<>();
Expand Down

0 comments on commit d73aa1e

Please sign in to comment.