Skip to content

Commit

Permalink
Maintain pool selection for the minimizeDataMovement instance partiti…
Browse files Browse the repository at this point in the history
…on assignment strategy (apache#11953)
  • Loading branch information
jackjlli authored Feb 1, 2024
1 parent 76379fb commit a8b1fa3
Show file tree
Hide file tree
Showing 15 changed files with 832 additions and 414 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public static InstanceAssignmentConfig getInstanceAssignmentConfig(TableConfig t
replicaGroupStrategyConfig.getNumInstancesPerPartition(), 0, 0, minimizeDataMovement, null);
}

return new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig);
return new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, minimizeDataMovement);
}

public static boolean isMirrorServerSetAssignment(TableConfig tableConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public void testSerDe()
InstanceAssignmentConfig instanceAssignmentConfig =
new InstanceAssignmentConfig(new InstanceTagPoolConfig("tenant_OFFLINE", true, 3, null),
new InstanceConstraintConfig(Arrays.asList("constraint1", "constraint2")),
new InstanceReplicaGroupPartitionConfig(true, 0, 3, 5, 0, 0, false, null));
new InstanceReplicaGroupPartitionConfig(true, 0, 3, 5, 0, 0, false, null), null, false);
TableConfig tableConfig = tableConfigBuilder.setInstanceAssignmentConfigMap(
Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), instanceAssignmentConfig)).build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ public class FDAwareInstancePartitionSelector extends InstancePartitionSelector
private static final Logger LOGGER = LoggerFactory.getLogger(FDAwareInstancePartitionSelector.class);

public FDAwareInstancePartitionSelector(InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig,
String tableNameWithType, @Nullable InstancePartitions existingInstancePartitions) {
super(replicaGroupPartitionConfig, tableNameWithType, existingInstancePartitions);
String tableNameWithType, @Nullable InstancePartitions existingInstancePartitions, boolean minimizeDataMovement) {
super(replicaGroupPartitionConfig, tableNameWithType, existingInstancePartitions, minimizeDataMovement);
}

/**
Expand Down Expand Up @@ -109,10 +109,7 @@ private Pair<Integer, Integer> processReplicaGroupAssignmentPreconditions(int nu
return new ImmutablePair<>(numReplicaGroups, numInstancesPerReplicaGroup);
}

/**
* Selects instances based on the replica-group/partition config, and stores the result into the given instance
* partitions.
*/
@Override
public void selectInstances(Map<Integer, List<InstanceConfig>> faultDomainToInstanceConfigsMap,
InstancePartitions instancePartitions) {

Expand Down Expand Up @@ -152,7 +149,7 @@ public void selectInstances(Map<Integer, List<InstanceConfig>> faultDomainToInst
* initialize the new replicaGroupBasedAssignmentState for assignment,
* place existing instances in their corresponding positions
*/
if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && _existingInstancePartitions != null) {
if (_minimizeDataMovement) {
int numExistingReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
int numExistingPartitions = _existingInstancePartitions.getNumPartitions();
/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ public InstancePartitions assignInstances(InstancePartitionsType instancePartiti
}

public InstancePartitions assignInstances(InstancePartitionsType instancePartitionsType,
List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions existingInstancePartitions, @Nullable
InstancePartitions preConfiguredInstancePartitions) {
List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions existingInstancePartitions,
@Nullable InstancePartitions preConfiguredInstancePartitions) {
String tableNameWithType = _tableConfig.getTableName();
InstanceAssignmentConfig assignmentConfig =
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig, instancePartitionsType);
Expand All @@ -88,8 +88,10 @@ private InstancePartitions getInstancePartitions(String instancePartitionsName,
String tableNameWithType = _tableConfig.getTableName();
LOGGER.info("Starting {} instance assignment for table {}", instancePartitionsName, tableNameWithType);

boolean minimizeDataMovement = instanceAssignmentConfig.isMinimizeDataMovement();
InstanceTagPoolSelector tagPoolSelector =
new InstanceTagPoolSelector(instanceAssignmentConfig.getTagPoolConfig(), tableNameWithType);
new InstanceTagPoolSelector(instanceAssignmentConfig.getTagPoolConfig(), tableNameWithType,
minimizeDataMovement, existingInstancePartitions);
Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap = tagPoolSelector.selectInstances(instanceConfigs);

InstanceConstraintConfig constraintConfig = instanceAssignmentConfig.getConstraintConfig();
Expand All @@ -106,7 +108,7 @@ private InstancePartitions getInstancePartitions(String instancePartitionsName,
InstancePartitionSelector instancePartitionSelector =
InstancePartitionSelectorFactory.getInstance(instanceAssignmentConfig.getPartitionSelector(),
instanceAssignmentConfig.getReplicaGroupPartitionConfig(), tableNameWithType, existingInstancePartitions,
preConfiguredInstancePartitions);
preConfiguredInstancePartitions, minimizeDataMovement);
InstancePartitions instancePartitions = new InstancePartitions(instancePartitionsName);
instancePartitionSelector.selectInstances(poolToInstanceConfigsMap, instancePartitions);
return instancePartitions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
Expand All @@ -29,12 +30,17 @@ abstract class InstancePartitionSelector {
protected final InstanceReplicaGroupPartitionConfig _replicaGroupPartitionConfig;
protected final String _tableNameWithType;
protected final InstancePartitions _existingInstancePartitions;
protected final boolean _minimizeDataMovement;

public InstancePartitionSelector(InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig,
String tableNameWithType, InstancePartitions existingInstancePartitions) {
String tableNameWithType, @Nullable InstancePartitions existingInstancePartitions, boolean minimizeDataMovement) {
_replicaGroupPartitionConfig = replicaGroupPartitionConfig;
_tableNameWithType = tableNameWithType;
_existingInstancePartitions = existingInstancePartitions;
// For backward compatibility, enable minimize data movement when it is enabled in top level or instance partition
// selector level
_minimizeDataMovement = (minimizeDataMovement || replicaGroupPartitionConfig.isMinimizeDataMovement())
&& existingInstancePartitions != null;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.controller.helix.core.assignment.instance;

import java.util.Arrays;
import javax.annotation.Nullable;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
Expand All @@ -31,25 +32,18 @@ private InstancePartitionSelectorFactory() {

public static InstancePartitionSelector getInstance(InstanceAssignmentConfig.PartitionSelector partitionSelector,
InstanceReplicaGroupPartitionConfig instanceReplicaGroupPartitionConfig, String tableNameWithType,
InstancePartitions existingInstancePartitions) {
return getInstance(partitionSelector, instanceReplicaGroupPartitionConfig, tableNameWithType,
existingInstancePartitions, null);
}

public static InstancePartitionSelector getInstance(InstanceAssignmentConfig.PartitionSelector partitionSelector,
InstanceReplicaGroupPartitionConfig instanceReplicaGroupPartitionConfig, String tableNameWithType,
InstancePartitions existingInstancePartitions, InstancePartitions preConfiguredInstancePartitions
) {
InstancePartitions existingInstancePartitions, @Nullable InstancePartitions preConfiguredInstancePartitions,
boolean minimizeDataMovement) {
switch (partitionSelector) {
case FD_AWARE_INSTANCE_PARTITION_SELECTOR:
return new FDAwareInstancePartitionSelector(instanceReplicaGroupPartitionConfig, tableNameWithType,
existingInstancePartitions);
existingInstancePartitions, minimizeDataMovement);
case INSTANCE_REPLICA_GROUP_PARTITION_SELECTOR:
return new InstanceReplicaGroupPartitionSelector(instanceReplicaGroupPartitionConfig, tableNameWithType,
existingInstancePartitions);
existingInstancePartitions, minimizeDataMovement);
case MIRROR_SERVER_SET_PARTITION_SELECTOR:
return new MirrorServerSetInstancePartitionSelector(instanceReplicaGroupPartitionConfig, tableNameWithType,
existingInstancePartitions, preConfiguredInstancePartitions);
existingInstancePartitions, preConfiguredInstancePartitions, minimizeDataMovement);
default:
throw new IllegalStateException("Unexpected PartitionSelector: " + partitionSelector + ", should be from"
+ Arrays.toString(InstanceAssignmentConfig.PartitionSelector.values()));
Expand Down
Loading

0 comments on commit a8b1fa3

Please sign in to comment.