diff --git a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java index ebf38d308f59..13cc270954a0 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java @@ -122,7 +122,7 @@ public static InstanceAssignmentConfig getInstanceAssignmentConfig(TableConfig t replicaGroupStrategyConfig.getNumInstancesPerPartition(), 0, 0, minimizeDataMovement, null); } - return new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig); + return new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, minimizeDataMovement); } public static boolean isMirrorServerSetAssignment(TableConfig tableConfig, diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java index ed9d605af029..74f857a10270 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java @@ -212,7 +212,7 @@ public void testSerDe() InstanceAssignmentConfig instanceAssignmentConfig = new InstanceAssignmentConfig(new InstanceTagPoolConfig("tenant_OFFLINE", true, 3, null), new InstanceConstraintConfig(Arrays.asList("constraint1", "constraint2")), - new InstanceReplicaGroupPartitionConfig(true, 0, 3, 5, 0, 0, false, null)); + new InstanceReplicaGroupPartitionConfig(true, 0, 3, 5, 0, 0, false, null), null, false); TableConfig tableConfig = tableConfigBuilder.setInstanceAssignmentConfigMap( Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), instanceAssignmentConfig)).build(); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/FDAwareInstancePartitionSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/FDAwareInstancePartitionSelector.java index 294971615a2a..89d64272e3d7 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/FDAwareInstancePartitionSelector.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/FDAwareInstancePartitionSelector.java @@ -50,8 +50,8 @@ public class FDAwareInstancePartitionSelector extends InstancePartitionSelector private static final Logger LOGGER = LoggerFactory.getLogger(FDAwareInstancePartitionSelector.class); public FDAwareInstancePartitionSelector(InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig, - String tableNameWithType, @Nullable InstancePartitions existingInstancePartitions) { - super(replicaGroupPartitionConfig, tableNameWithType, existingInstancePartitions); + String tableNameWithType, @Nullable InstancePartitions existingInstancePartitions, boolean minimizeDataMovement) { + super(replicaGroupPartitionConfig, tableNameWithType, existingInstancePartitions, minimizeDataMovement); } /** @@ -109,10 +109,7 @@ private Pair processReplicaGroupAssignmentPreconditions(int nu return new ImmutablePair<>(numReplicaGroups, numInstancesPerReplicaGroup); } - /** - * Selects instances based on the replica-group/partition config, and stores the result into the given instance - * partitions. - */ + @Override public void selectInstances(Map> faultDomainToInstanceConfigsMap, InstancePartitions instancePartitions) { @@ -152,7 +149,7 @@ public void selectInstances(Map> faultDomainToInst * initialize the new replicaGroupBasedAssignmentState for assignment, * place existing instances in their corresponding positions */ - if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && _existingInstancePartitions != null) { + if (_minimizeDataMovement) { int numExistingReplicaGroups = _existingInstancePartitions.getNumReplicaGroups(); int numExistingPartitions = _existingInstancePartitions.getNumPartitions(); /* diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java index 6d869b86c169..09866c1ed77a 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java @@ -64,8 +64,8 @@ public InstancePartitions assignInstances(InstancePartitionsType instancePartiti } public InstancePartitions assignInstances(InstancePartitionsType instancePartitionsType, - List instanceConfigs, @Nullable InstancePartitions existingInstancePartitions, @Nullable - InstancePartitions preConfiguredInstancePartitions) { + List instanceConfigs, @Nullable InstancePartitions existingInstancePartitions, + @Nullable InstancePartitions preConfiguredInstancePartitions) { String tableNameWithType = _tableConfig.getTableName(); InstanceAssignmentConfig assignmentConfig = InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig, instancePartitionsType); @@ -88,8 +88,10 @@ private InstancePartitions getInstancePartitions(String instancePartitionsName, String tableNameWithType = _tableConfig.getTableName(); LOGGER.info("Starting {} instance assignment for table {}", instancePartitionsName, tableNameWithType); + boolean minimizeDataMovement = instanceAssignmentConfig.isMinimizeDataMovement(); InstanceTagPoolSelector tagPoolSelector = - new InstanceTagPoolSelector(instanceAssignmentConfig.getTagPoolConfig(), tableNameWithType); + new InstanceTagPoolSelector(instanceAssignmentConfig.getTagPoolConfig(), tableNameWithType, + minimizeDataMovement, existingInstancePartitions); Map> poolToInstanceConfigsMap = tagPoolSelector.selectInstances(instanceConfigs); InstanceConstraintConfig constraintConfig = instanceAssignmentConfig.getConstraintConfig(); @@ -106,7 +108,7 @@ private InstancePartitions getInstancePartitions(String instancePartitionsName, InstancePartitionSelector instancePartitionSelector = InstancePartitionSelectorFactory.getInstance(instanceAssignmentConfig.getPartitionSelector(), instanceAssignmentConfig.getReplicaGroupPartitionConfig(), tableNameWithType, existingInstancePartitions, - preConfiguredInstancePartitions); + preConfiguredInstancePartitions, minimizeDataMovement); InstancePartitions instancePartitions = new InstancePartitions(instancePartitionsName); instancePartitionSelector.selectInstances(poolToInstanceConfigsMap, instancePartitions); return instancePartitions; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelector.java index 396b8699241e..b80ad8bba9d9 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelector.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelector.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Map; +import javax.annotation.Nullable; import org.apache.helix.model.InstanceConfig; import org.apache.pinot.common.assignment.InstancePartitions; import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig; @@ -29,12 +30,17 @@ abstract class InstancePartitionSelector { protected final InstanceReplicaGroupPartitionConfig _replicaGroupPartitionConfig; protected final String _tableNameWithType; protected final InstancePartitions _existingInstancePartitions; + protected final boolean _minimizeDataMovement; public InstancePartitionSelector(InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig, - String tableNameWithType, InstancePartitions existingInstancePartitions) { + String tableNameWithType, @Nullable InstancePartitions existingInstancePartitions, boolean minimizeDataMovement) { _replicaGroupPartitionConfig = replicaGroupPartitionConfig; _tableNameWithType = tableNameWithType; _existingInstancePartitions = existingInstancePartitions; + // For backward compatibility, enable minimize data movement when it is enabled in top level or instance partition + // selector level + _minimizeDataMovement = (minimizeDataMovement || replicaGroupPartitionConfig.isMinimizeDataMovement()) + && existingInstancePartitions != null; } /** diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelectorFactory.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelectorFactory.java index 256aa89b0232..8a343b159888 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelectorFactory.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelectorFactory.java @@ -19,6 +19,7 @@ package org.apache.pinot.controller.helix.core.assignment.instance; import java.util.Arrays; +import javax.annotation.Nullable; import org.apache.pinot.common.assignment.InstancePartitions; import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig; import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig; @@ -31,25 +32,18 @@ private InstancePartitionSelectorFactory() { public static InstancePartitionSelector getInstance(InstanceAssignmentConfig.PartitionSelector partitionSelector, InstanceReplicaGroupPartitionConfig instanceReplicaGroupPartitionConfig, String tableNameWithType, - InstancePartitions existingInstancePartitions) { - return getInstance(partitionSelector, instanceReplicaGroupPartitionConfig, tableNameWithType, - existingInstancePartitions, null); - } - - public static InstancePartitionSelector getInstance(InstanceAssignmentConfig.PartitionSelector partitionSelector, - InstanceReplicaGroupPartitionConfig instanceReplicaGroupPartitionConfig, String tableNameWithType, - InstancePartitions existingInstancePartitions, InstancePartitions preConfiguredInstancePartitions - ) { + InstancePartitions existingInstancePartitions, @Nullable InstancePartitions preConfiguredInstancePartitions, + boolean minimizeDataMovement) { switch (partitionSelector) { case FD_AWARE_INSTANCE_PARTITION_SELECTOR: return new FDAwareInstancePartitionSelector(instanceReplicaGroupPartitionConfig, tableNameWithType, - existingInstancePartitions); + existingInstancePartitions, minimizeDataMovement); case INSTANCE_REPLICA_GROUP_PARTITION_SELECTOR: return new InstanceReplicaGroupPartitionSelector(instanceReplicaGroupPartitionConfig, tableNameWithType, - existingInstancePartitions); + existingInstancePartitions, minimizeDataMovement); case MIRROR_SERVER_SET_PARTITION_SELECTOR: return new MirrorServerSetInstancePartitionSelector(instanceReplicaGroupPartitionConfig, tableNameWithType, - existingInstancePartitions, preConfiguredInstancePartitions); + existingInstancePartitions, preConfiguredInstancePartitions, minimizeDataMovement); default: throw new IllegalStateException("Unexpected PartitionSelector: " + partitionSelector + ", should be from" + Arrays.toString(InstanceAssignmentConfig.PartitionSelector.values())); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java index de1e681d17b3..8da6dbe2f62e 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java @@ -19,18 +19,19 @@ package org.apache.pinot.controller.helix.core.assignment.instance; import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import java.util.ArrayList; -import java.util.Collections; -import java.util.Deque; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; import javax.annotation.Nullable; +import org.apache.commons.lang3.tuple.Triple; import org.apache.helix.model.InstanceConfig; import org.apache.pinot.common.assignment.InstancePartitions; import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig; @@ -46,14 +47,11 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele private static final Logger LOGGER = LoggerFactory.getLogger(InstanceReplicaGroupPartitionSelector.class); public InstanceReplicaGroupPartitionSelector(InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig, - String tableNameWithType, @Nullable InstancePartitions existingInstancePartitions) { - super(replicaGroupPartitionConfig, tableNameWithType, existingInstancePartitions); + String tableNameWithType, @Nullable InstancePartitions existingInstancePartitions, boolean minimizeDataMovement) { + super(replicaGroupPartitionConfig, tableNameWithType, existingInstancePartitions, minimizeDataMovement); } - /** - * Selects instances based on the replica-group/partition config, and stores the result into the given instance - * partitions. - */ + @Override public void selectInstances(Map> poolToInstanceConfigsMap, InstancePartitions instancePartitions) { int numPools = poolToInstanceConfigsMap.size(); @@ -62,297 +60,448 @@ public void selectInstances(Map> poolToInstanceCon int tableNameHash = Math.abs(_tableNameWithType.hashCode()); List pools = new ArrayList<>(poolToInstanceConfigsMap.keySet()); pools.sort(null); - LOGGER.info("Starting instance replica-group/partition selection for table: {} with hash: {} from pools: {}", - _tableNameWithType, tableNameHash, pools); + LOGGER.info("Starting instance replica-group/partition selection for table: {} with hash: {} from pools: {}, " + + "minimize data movement: {}", _tableNameWithType, tableNameHash, pools, _minimizeDataMovement); if (_replicaGroupPartitionConfig.isReplicaGroupBased()) { - // Replica-group based selection - - int numReplicaGroups = _replicaGroupPartitionConfig.getNumReplicaGroups(); - Preconditions.checkState(numReplicaGroups > 0, "Number of replica-groups must be positive"); - Map> poolToReplicaGroupIdsMap = new TreeMap<>(); - Map replicaGroupIdToPoolMap = new TreeMap<>(); - Map> poolToCandidateInstancesMap = new TreeMap<>(); - for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) { - // Pick one pool for each replica-group based on the table name hash - int pool = pools.get((tableNameHash + replicaId) % numPools); - poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(replicaId); - replicaGroupIdToPoolMap.put(replicaId, pool); - - Set candidateInstances = - poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>()); - List instanceConfigsInPool = poolToInstanceConfigsMap.get(pool); - instanceConfigsInPool.forEach(k -> candidateInstances.add(k.getInstanceName())); - } - LOGGER.info("Selecting {} replica-groups from pool: {} for table: {}", numReplicaGroups, poolToReplicaGroupIdsMap, - _tableNameWithType); - - int numInstancesPerReplicaGroup = _replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup(); - if (numInstancesPerReplicaGroup > 0) { - // Check if we have enough instances if number of instances per replica-group is configured - for (Map.Entry> entry : poolToReplicaGroupIdsMap.entrySet()) { - int pool = entry.getKey(); - int numInstancesInPool = poolToInstanceConfigsMap.get(pool).size(); - int numInstancesToSelect = numInstancesPerReplicaGroup * entry.getValue().size(); - Preconditions.checkState(numInstancesToSelect <= numInstancesInPool, - "Not enough qualified instances from pool: %s (%s in the pool, asked for %s)", pool, numInstancesInPool, - numInstancesToSelect); - } + if (_minimizeDataMovement) { + replicaGroupBasedMinimumMovement(poolToInstanceConfigsMap, instancePartitions, pools, tableNameHash); } else { - // Use as many instances as possible if number of instances per replica-group is not configured - numInstancesPerReplicaGroup = Integer.MAX_VALUE; - for (Map.Entry> entry : poolToReplicaGroupIdsMap.entrySet()) { - int pool = entry.getKey(); - int numReplicaGroupsInPool = entry.getValue().size(); - int numInstancesInPool = poolToInstanceConfigsMap.get(pool).size(); - Preconditions.checkState(numReplicaGroupsInPool <= numInstancesInPool, - "Not enough qualified instances from pool: %s, cannot select %s replica-groups from %s instances", pool, - numReplicaGroupsInPool, numInstancesInPool); - numInstancesPerReplicaGroup = - Math.min(numInstancesPerReplicaGroup, numInstancesInPool / numReplicaGroupsInPool); - } + replicaGroupBasedSimple(poolToInstanceConfigsMap, instancePartitions, pools, tableNameHash); } - LOGGER.info("Selecting {} instances per replica-group for table: {}", numInstancesPerReplicaGroup, - _tableNameWithType); + } else { + nonReplicaGroupBased(poolToInstanceConfigsMap, instancePartitions, pools, tableNameHash); + } + } - // Assign instances within a replica-group to one partition if not configured - int numPartitions = _replicaGroupPartitionConfig.getNumPartitions(); - if (numPartitions <= 0) { - numPartitions = 1; + private void nonReplicaGroupBased(Map> poolToInstanceConfigsMap, + InstancePartitions instancePartitions, List pools, int tableNameHash) { + // Pick one pool based on the table name hash + int pool = pools.get(Math.abs(tableNameHash % pools.size())); + LOGGER.info("Selecting pool: {} for table: {}", pool, _tableNameWithType); + List instanceConfigs = poolToInstanceConfigsMap.get(pool); + int numInstances = instanceConfigs.size(); + + // Assign all instances if not configured + int numInstancesToSelect = _replicaGroupPartitionConfig.getNumInstances(); + if (numInstancesToSelect > 0) { + Preconditions.checkState(numInstancesToSelect <= numInstances, + "Not enough qualified instances from pool: %s (%s in the pool, asked for %s)", pool, numInstances, + numInstancesToSelect); + } else { + numInstancesToSelect = numInstances; + } + + List instancesToSelect; + if (_minimizeDataMovement) { + List existingInstances = _existingInstancePartitions.getInstances(0, 0); + LinkedHashSet candidateInstances = Sets.newLinkedHashSetWithExpectedSize(instanceConfigs.size()); + instanceConfigs.forEach(k -> candidateInstances.add(k.getInstanceName())); + instancesToSelect = + selectInstancesWithMinimumMovement(numInstancesToSelect, candidateInstances, existingInstances); + LOGGER.info("Selecting instances: {} for table: {}, existing instances: {}", instancesToSelect, + _tableNameWithType, existingInstances); + } else { + instancesToSelect = new ArrayList<>(numInstancesToSelect); + for (int i = 0; i < numInstancesToSelect; i++) { + instancesToSelect.add(instanceConfigs.get(i).getInstanceName()); } - // Assign all instances within a replica-group to each partition if not configured - int numInstancesPerPartition = _replicaGroupPartitionConfig.getNumInstancesPerPartition(); - if (numInstancesPerPartition > 0) { - Preconditions.checkState(numInstancesPerPartition <= numInstancesPerReplicaGroup, - "Number of instances per partition: %s must be smaller or equal to number of instances per replica-group:" - + " %s", numInstancesPerPartition, numInstancesPerReplicaGroup); - } else { - numInstancesPerPartition = numInstancesPerReplicaGroup; + LOGGER.info("Selecting instances: {} for table: {}", instancesToSelect, _tableNameWithType); + } + // Set the instances as partition 0 replica 0 + instancePartitions.setInstances(0, 0, instancesToSelect); + } + + /** + * Selects the instances with minimum movement. + * For each instance in the existing instances, if it is still alive, keep it in the same position. Then fill the + * vacant positions with the remaining candidate instances. + * NOTE: This method will modify the candidate instances. + */ + private static List selectInstancesWithMinimumMovement(int numInstancesToSelect, + LinkedHashSet candidateInstances, List existingInstances) { + // Initialize the list with empty positions to fill + List instancesToSelect = new ArrayList<>(numInstancesToSelect); + for (int i = 0; i < numInstancesToSelect; i++) { + instancesToSelect.add(null); + } + + // Keep the existing instances that are still alive + int numInstancesToCheck = Math.min(numInstancesToSelect, existingInstances.size()); + for (int i = 0; i < numInstancesToCheck; i++) { + String existingInstance = existingInstances.get(i); + if (candidateInstances.remove(existingInstance)) { + instancesToSelect.set(i, existingInstance); } - LOGGER.info("Selecting {} partitions, {} instances per partition within a replica-group for table: {}", - numPartitions, numInstancesPerPartition, _tableNameWithType); - - if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && _existingInstancePartitions != null) { - // Minimize data movement. - int existingNumPartitions = _existingInstancePartitions.getNumPartitions(); - int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups(); - int numCommonReplicaGroups = Math.min(numReplicaGroups, existingNumReplicaGroups); - - Map> replicaGroupIdToExistingInstancesMap = new TreeMap<>(); - // Step 1: find out the replica groups and their existing instances, - // so that these instances can be filtered out and won't be chosen for the other replica group. - for (int replicaGroupId = 0; replicaGroupId < numCommonReplicaGroups; replicaGroupId++) { - Integer pool = replicaGroupIdToPoolMap.get(replicaGroupId); - if (pool == null) { - // Skip the replica group if it's no longer needed. - continue; - } + } - for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) { - List existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId); - replicaGroupIdToExistingInstancesMap.computeIfAbsent(replicaGroupId, k -> new HashSet<>()) - .addAll(existingInstances); - } - } + // Fill the vacant positions with the remaining candidate instances + Iterator iterator = candidateInstances.iterator(); + for (int i = 0; i < numInstancesToSelect; i++) { + if (instancesToSelect.get(i) == null) { + instancesToSelect.set(i, iterator.next()); + } + } - for (int replicaGroupId = 0; replicaGroupId < numCommonReplicaGroups; replicaGroupId++) { - Integer pool = replicaGroupIdToPoolMap.get(replicaGroupId); - // Step 2: filter out instances that belong to other replica groups which should not be the candidate. - LinkedHashSet candidateInstances = new LinkedHashSet<>(poolToCandidateInstancesMap.get(pool)); - for (int otherReplicaGroupId = 0; - otherReplicaGroupId < existingNumReplicaGroups && otherReplicaGroupId < numReplicaGroups; - otherReplicaGroupId++) { - if (replicaGroupId != otherReplicaGroupId) { - candidateInstances.removeAll(replicaGroupIdToExistingInstancesMap.get(otherReplicaGroupId)); - } - } - LinkedHashSet chosenCandidateInstances = new LinkedHashSet<>(); - for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) { - List existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId); - // Step 3: figure out the missing instances and the new instances to fill their vacant positions. - List instancesToSelect = - getInstancesWithMinimumMovement(numInstancesPerPartition, candidateInstances, existingInstances); - chosenCandidateInstances.addAll(instancesToSelect); - instancePartitions.setInstances(partitionId, replicaGroupId, instancesToSelect); - } - // Remove instances that are already been chosen. - poolToCandidateInstancesMap.get(pool).removeAll(chosenCandidateInstances); - } + return instancesToSelect; + } - // If the new number of replica groups is greater than the existing number of replica groups. - for (int replicaGroupId = existingNumReplicaGroups; replicaGroupId < numReplicaGroups; replicaGroupId++) { - int pool = replicaGroupIdToPoolMap.get(replicaGroupId); - LinkedHashSet candidateInstances = new LinkedHashSet<>(poolToCandidateInstancesMap.get(pool)); - - Set chosenCandidateInstances = new HashSet<>(); - for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) { - List existingInstances = Collections.emptyList(); - List instancesToSelect = - getInstancesWithMinimumMovement(numInstancesPerPartition, candidateInstances, existingInstances); - chosenCandidateInstances.addAll(instancesToSelect); - instancePartitions.setInstances(partitionId, replicaGroupId, instancesToSelect); - } - // Remove instances that are already been chosen. - poolToCandidateInstancesMap.get(pool).removeAll(chosenCandidateInstances); - } - } else { - // Pick instances based on the sorted list of instance names. - String[][] replicaGroupIdToInstancesMap = new String[numReplicaGroups][numInstancesPerReplicaGroup]; - for (Map.Entry> entry : poolToReplicaGroupIdsMap.entrySet()) { - List instanceConfigsInPool = poolToInstanceConfigsMap.get(entry.getKey()); - List replicaGroupIdsInPool = entry.getValue(); - - // Use round-robin to assign instances to each replica-group so that they get instances with similar picking - // priority - // E.g. (within a pool, 10 instances, 2 replica-groups, 3 instances per replica-group) - // [i0, i1, i2, i3, i4, i5, i6, i7, i8, i9] - // r0 r1 r0 r1 r0 r1 - int instanceIdInPool = 0; - for (int instanceIdInReplicaGroup = 0; instanceIdInReplicaGroup < numInstancesPerReplicaGroup; - instanceIdInReplicaGroup++) { - for (int replicaGroupId : replicaGroupIdsInPool) { - replicaGroupIdToInstancesMap[replicaGroupId][instanceIdInReplicaGroup] = - instanceConfigsInPool.get(instanceIdInPool++).getInstanceName(); - } - } + private void replicaGroupBasedSimple(Map> poolToInstanceConfigsMap, + InstancePartitions instancePartitions, List pools, int tableNameHash) { + int numPools = pools.size(); + int numReplicaGroups = getNumReplicaGroups(); + + // Pick one pool for each replica-group based on the table name hash + Map> poolToReplicaGroupIdsMap = new TreeMap<>(); + int startIndex = Math.abs(tableNameHash % numPools); + for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) { + int pool = pools.get((startIndex + replicaGroupId) % numPools); + poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(replicaGroupId); + } + LOGGER.info("Selecting {} replica-groups from pool: {} for table: {}", numReplicaGroups, poolToReplicaGroupIdsMap, + _tableNameWithType); + + int numInstancesPerReplicaGroup = + getNumInstancesPerReplicaGroup(poolToInstanceConfigsMap, poolToReplicaGroupIdsMap); + LOGGER.info("Selecting {} instances per replica-group for table: {}", numInstancesPerReplicaGroup, + _tableNameWithType); + int numPartitions = getNumPartitions(); + int numInstancesPerPartition = getNumInstancesPerPartition(numInstancesPerReplicaGroup); + LOGGER.info("Selecting {} partitions, {} instances per partition within a replica-group for table: {}", + numPartitions, numInstancesPerPartition, _tableNameWithType); + + // Pick instances based on the sorted list of instance names + String[][] replicaGroupIdToInstancesMap = new String[numReplicaGroups][numInstancesPerReplicaGroup]; + for (Map.Entry> entry : poolToReplicaGroupIdsMap.entrySet()) { + List instanceConfigsInPool = poolToInstanceConfigsMap.get(entry.getKey()); + List replicaGroupIdsInPool = entry.getValue(); + + // Use round-robin to assign instances to each replica-group so that they get instances with similar picking + // priority + // E.g. (within a pool, 10 instances, 2 replica-groups, 3 instances per replica-group) + // [i0, i1, i2, i3, i4, i5, i6, i7, i8, i9] + // r0 r1 r0 r1 r0 r1 + int instanceIdInPool = 0; + for (int instanceIdInReplicaGroup = 0; instanceIdInReplicaGroup < numInstancesPerReplicaGroup; + instanceIdInReplicaGroup++) { + for (int replicaGroupId : replicaGroupIdsInPool) { + replicaGroupIdToInstancesMap[replicaGroupId][instanceIdInReplicaGroup] = + instanceConfigsInPool.get(instanceIdInPool++).getInstanceName(); } + } + } - // Assign consecutive instances within a replica-group to each partition. - // E.g. (within a replica-group, 5 instances, 3 partitions, 3 instances per partition) - // [i0, i1, i2, i3, i4] - // p0 p0 p0 p1 p1 - // p1 p2 p2 p2 - for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) { - int instanceIdInReplicaGroup = 0; - for (int partitionId = 0; partitionId < numPartitions; partitionId++) { - List instancesInPartition = new ArrayList<>(numInstancesPerPartition); - for (int instanceIdInPartition = 0; instanceIdInPartition < numInstancesPerPartition; - instanceIdInPartition++) { - instancesInPartition.add(replicaGroupIdToInstancesMap[replicaGroupId][instanceIdInReplicaGroup]); - instanceIdInReplicaGroup = (instanceIdInReplicaGroup + 1) % numInstancesPerReplicaGroup; - } - LOGGER.info("Selecting instances: {} for replica-group: {}, partition: {} for table: {}", - instancesInPartition, replicaGroupId, partitionId, _tableNameWithType); - instancePartitions.setInstances(partitionId, replicaGroupId, instancesInPartition); - } + // Assign consecutive instances within a replica-group to each partition + // E.g. (within a replica-group, 5 instances, 3 partitions, 3 instances per partition) + // [i0, i1, i2, i3, i4] + // p0 p0 p0 p1 p1 + // p1 p2 p2 p2 + for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) { + String[] instancesInReplicaGroup = replicaGroupIdToInstancesMap[replicaGroupId]; + int instanceIdInReplicaGroup = 0; + for (int partitionId = 0; partitionId < numPartitions; partitionId++) { + List instances = new ArrayList<>(numInstancesPerPartition); + for (int i = 0; i < numInstancesPerPartition; i++) { + instances.add(instancesInReplicaGroup[instanceIdInReplicaGroup]); + instanceIdInReplicaGroup = (instanceIdInReplicaGroup + 1) % numInstancesPerReplicaGroup; } + LOGGER.info("Selecting instances: {} for replica-group: {}, partition: {} for table: {}", instances, + replicaGroupId, partitionId, _tableNameWithType); + instancePartitions.setInstances(partitionId, replicaGroupId, instances); } - } else { - // Non-replica-group based selection - - // Pick one pool based on the table name hash - int pool = pools.get(tableNameHash % numPools); - LOGGER.info("Selecting pool: {} for table: {}", pool, _tableNameWithType); - List instanceConfigs = poolToInstanceConfigsMap.get(pool); - int numInstanceConfigs = instanceConfigs.size(); - - // Assign all instances if not configured - int numInstancesToSelect = _replicaGroupPartitionConfig.getNumInstances(); - if (numInstancesToSelect > 0) { - Preconditions.checkState(numInstancesToSelect <= numInstanceConfigs, - "Not enough qualified instances from pool: %s (%s in the pool, asked for %s)", pool, numInstanceConfigs, + } + } + + private int getNumReplicaGroups() { + int numReplicaGroups = _replicaGroupPartitionConfig.getNumReplicaGroups(); + Preconditions.checkState(numReplicaGroups > 0, "Number of replica-groups must be positive"); + return numReplicaGroups; + } + + private int getNumInstancesPerReplicaGroup(Map> poolToInstanceConfigsMap, + Map> poolToReplicaGroupIdsMap) { + int numInstancesPerReplicaGroup = _replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup(); + if (numInstancesPerReplicaGroup > 0) { + // Check if we have enough instances if number of instances per replica-group is configured + for (Map.Entry> entry : poolToReplicaGroupIdsMap.entrySet()) { + int pool = entry.getKey(); + int numInstancesInPool = poolToInstanceConfigsMap.get(pool).size(); + int numInstancesToSelect = numInstancesPerReplicaGroup * entry.getValue().size(); + Preconditions.checkState(numInstancesToSelect <= numInstancesInPool, + "Not enough qualified instances from pool: %s (%s in the pool, asked for %s)", pool, numInstancesInPool, numInstancesToSelect); - } else { - numInstancesToSelect = numInstanceConfigs; } - - List instancesToSelect; - if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && _existingInstancePartitions != null) { - // Minimize data movement. - List existingInstances = _existingInstancePartitions.getInstances(0, 0); - LinkedHashSet candidateInstances = new LinkedHashSet<>(); - instanceConfigs.forEach(k -> candidateInstances.add(k.getInstanceName())); - instancesToSelect = - getInstancesWithMinimumMovement(numInstancesToSelect, candidateInstances, existingInstances); - } else { - // Select instances sequentially. - instancesToSelect = new ArrayList<>(numInstancesToSelect); - for (int i = 0; i < numInstancesToSelect; i++) { - instancesToSelect.add(instanceConfigs.get(i).getInstanceName()); - } + } else { + // Use as many instances as possible if number of instances per replica-group is not configured + numInstancesPerReplicaGroup = Integer.MAX_VALUE; + for (Map.Entry> entry : poolToReplicaGroupIdsMap.entrySet()) { + int pool = entry.getKey(); + int numReplicaGroupsInPool = entry.getValue().size(); + int numInstancesInPool = poolToInstanceConfigsMap.get(pool).size(); + Preconditions.checkState(numReplicaGroupsInPool <= numInstancesInPool, + "Not enough qualified instances from pool: %s, cannot select %s replica-groups from %s instances", pool, + numReplicaGroupsInPool, numInstancesInPool); + numInstancesPerReplicaGroup = + Math.min(numInstancesPerReplicaGroup, numInstancesInPool / numReplicaGroupsInPool); } - LOGGER.info("Selecting instances: {} for table: {}", instancesToSelect, _tableNameWithType); - // Set the instances as partition 0 replica 0 - instancePartitions.setInstances(0, 0, instancesToSelect); } + return numInstancesPerReplicaGroup; } - /** - * Select instances with minimum movement. - * This algorithm can solve the following scenarios: - * * swap an instance - * * add/remove replica groups - * * increase/decrease number of instances per replica group - * TODO: handle the scenarios that selected pools are changed. - * TODO: improve the algorithm by doing the following steps: - * 1. assign the existing instances for all partitions; - * 2. assign the vacant positions based on the partitions already assigned to each instance. - * @param numInstancesToSelect number of instances to select - * @param candidateInstances candidate instances to be selected - * @param existingInstances list of existing instances - */ - private static List getInstancesWithMinimumMovement(int numInstancesToSelect, - LinkedHashSet candidateInstances, List existingInstances) { - // Initialize the list with empty positions to fill. - List instancesToSelect = new ArrayList<>(numInstancesToSelect); - for (int i = 0; i < numInstancesToSelect; i++) { - instancesToSelect.add(null); + private int getNumPartitions() { + // Assign instances within a replica-group to one partition if not configured + int numPartitions = _replicaGroupPartitionConfig.getNumPartitions(); + if (numPartitions <= 0) { + numPartitions = 1; } - Deque newlyAddedInstances = new LinkedList<>(); + return numPartitions; + } - // Find out the existing instances that are still alive. - Set existingInstancesStillAlive = new HashSet<>(); - for (String existingInstance : existingInstances) { - if (candidateInstances.contains(existingInstance)) { - existingInstancesStillAlive.add(existingInstance); + private int getNumInstancesPerPartition(int numInstancesPerReplicaGroup) { + // Assign all instances within a replica-group to each partition if not configured + int numInstancesPerPartition = _replicaGroupPartitionConfig.getNumInstancesPerPartition(); + if (numInstancesPerPartition > 0) { + Preconditions.checkState(numInstancesPerPartition <= numInstancesPerReplicaGroup, + "Number of instances per partition: %s must be smaller or equal to number of instances per replica-group: %s", + numInstancesPerPartition, numInstancesPerReplicaGroup); + } else { + numInstancesPerPartition = numInstancesPerReplicaGroup; + } + return numInstancesPerPartition; + } + + private void replicaGroupBasedMinimumMovement(Map> poolToInstanceConfigsMap, + InstancePartitions instancePartitions, List pools, int tableNameHash) { + int numPools = pools.size(); + int numReplicaGroups = getNumReplicaGroups(); + + Map instanceToPoolMap = new HashMap<>(); + for (Map.Entry> entry : poolToInstanceConfigsMap.entrySet()) { + int pool = entry.getKey(); + for (InstanceConfig instanceConfig : entry.getValue()) { + instanceToPoolMap.put(instanceConfig.getInstanceName(), pool); } } - // Find out the newly added instances. - for (String candidateInstance : candidateInstances) { - if (!existingInstancesStillAlive.contains(candidateInstance)) { - newlyAddedInstances.add(candidateInstance); + // Calculate the mapping from pool to replica-groups assigned to the pool + List> replicaGroupIdToExistingInstancesMap = new ArrayList<>(numReplicaGroups); + Map> poolToReplicaGroupIdsMap = new TreeMap<>(); + int maxReplicaGroupsPerPool = (numReplicaGroups + numPools - 1) / numPools; + int startIndex = Math.abs(tableNameHash % numPools); + + int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups(); + int existingNumPartitions = _existingInstancePartitions.getNumPartitions(); + for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) { + // For each replica-group, gather number of existing instances within each pool + Set existingInstanceSet = new HashSet<>(); + replicaGroupIdToExistingInstancesMap.add(existingInstanceSet); + Map poolToNumExistingInstancesMap = new TreeMap<>(); + if (replicaGroupId < existingNumReplicaGroups) { + for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) { + List existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId); + existingInstanceSet.addAll(existingInstances); + for (String existingInstance : existingInstances) { + Integer existingPool = instanceToPoolMap.get(existingInstance); + if (existingPool != null) { + poolToNumExistingInstancesMap.merge(existingPool, 1, Integer::sum); + } + } + } + } + // Sort the pools based on the number of existing instances in the pool in descending order, then use the table + // name hash to break even + // Triple stores (pool, numExistingInstances, poolIndex) for sorting + List> triples = new ArrayList<>(numPools); + for (int i = 0; i < numPools; i++) { + int pool = pools.get((startIndex + replicaGroupId + i) % numPools); + triples.add(Triple.of(pool, poolToNumExistingInstancesMap.getOrDefault(pool, 0), i)); + } + triples.sort((o1, o2) -> { + int result = Integer.compare(o2.getMiddle(), o1.getMiddle()); + return result != 0 ? result : Integer.compare(o1.getRight(), o2.getRight()); + }); + for (Triple triple : triples) { + int pool = triple.getLeft(); + List replicaGroupIds = poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>()); + if (replicaGroupIds.size() < maxReplicaGroupsPerPool) { + replicaGroupIds.add(replicaGroupId); + break; + } } } + LOGGER.info("Selecting {} replica-groups from pool: {} for table: {}", numReplicaGroups, poolToReplicaGroupIdsMap, + _tableNameWithType); - int numExistingInstances = existingInstances.size(); - for (int i = 0; i < numInstancesToSelect; i++) { - String existingInstance = i < numExistingInstances ? existingInstances.get(i) : null; - String selectedInstance; - if (existingInstance != null && candidateInstances.contains(existingInstance)) { - selectedInstance = existingInstance; - existingInstancesStillAlive.remove(selectedInstance); - } else { - selectedInstance = newlyAddedInstances.poll(); + int numInstancesPerReplicaGroup = + getNumInstancesPerReplicaGroup(poolToInstanceConfigsMap, poolToReplicaGroupIdsMap); + LOGGER.info("Selecting {} instances per replica-group for table: {}", numInstancesPerReplicaGroup, + _tableNameWithType); + int numPartitions = getNumPartitions(); + int numInstancesPerPartition = getNumInstancesPerPartition(numInstancesPerReplicaGroup); + LOGGER.info("Selecting {} partitions, {} instances per partition within a replica-group for table: {}", + numPartitions, numInstancesPerPartition, _tableNameWithType); + + List> replicaGroupIdToInstancesMap = new ArrayList<>(numReplicaGroups); + for (int i = 0; i < numReplicaGroups; i++) { + replicaGroupIdToInstancesMap.add(new ArrayList<>(numInstancesPerReplicaGroup)); + } + for (Map.Entry> entry : poolToReplicaGroupIdsMap.entrySet()) { + // For each pool, keep the existing instances that are still alive within each replica-group + int pool = entry.getKey(); + List replicaGroupIds = entry.getValue(); + List newInstances = new ArrayList<>(); + for (InstanceConfig instanceConfig : poolToInstanceConfigsMap.get(pool)) { + String instanceName = instanceConfig.getInstanceName(); + boolean isExistingInstance = false; + for (int replicaGroupId : replicaGroupIds) { + List instances = replicaGroupIdToInstancesMap.get(replicaGroupId); + if (instances.size() == numInstancesPerReplicaGroup) { + continue; + } + if (replicaGroupIdToExistingInstancesMap.get(replicaGroupId).contains(instanceName)) { + instances.add(instanceName); + isExistingInstance = true; + break; + } + } + if (!isExistingInstance) { + newInstances.add(instanceName); + } + } + // Fill the vacant positions with the new instances. First fill the replica groups with the least instances, then + // use round-robin to assign instances to each replica-group so that they get instances with similar picking + // priority. + int numInstancesToFill = numInstancesPerReplicaGroup * replicaGroupIds.size(); + for (int replicaGroupId : replicaGroupIds) { + numInstancesToFill -= replicaGroupIdToInstancesMap.get(replicaGroupId).size(); } - instancesToSelect.set(i, selectedInstance); - // If it's an existing alive instance, or it's for a new replica group, add the new instance to the tail, - // so that it won't be firstly chosen for the next partition. - // For newly added instances to fill the existing replica group, the sequence cannot change; - // otherwise there is no guarantee that same vacant position will be filled with the same new instance. - // The 'selectedInstance' object can still be null if there is no new instances from the candidate list. - if (selectedInstance != null && (i < numExistingInstances || existingInstances.isEmpty())) { - candidateInstances.remove(selectedInstance); - candidateInstances.add(selectedInstance); + for (int i = 0; i < numInstancesToFill; i++) { + int leastNumInstances = Integer.MAX_VALUE; + int replicaGroupIdWithLeastInstances = -1; + for (int replicaGroupId : replicaGroupIds) { + int numInstances = replicaGroupIdToInstancesMap.get(replicaGroupId).size(); + if (numInstances < leastNumInstances) { + leastNumInstances = numInstances; + replicaGroupIdWithLeastInstances = replicaGroupId; + } + } + replicaGroupIdToInstancesMap.get(replicaGroupIdWithLeastInstances).add(newInstances.get(i)); } } - // If there are still some vacant positions in the instance list, - // try to fill with instances which are either left over or newly added. - for (int i = 0; i < instancesToSelect.size(); i++) { - if (instancesToSelect.get(i) == null) { - if (!existingInstancesStillAlive.isEmpty()) { - Iterator iterator = existingInstancesStillAlive.iterator(); - String existingInstanceLeftOver = iterator.next(); - instancesToSelect.set(i, existingInstanceLeftOver); - iterator.remove(); - } else if (!newlyAddedInstances.isEmpty()) { - // pick a new instance to fill its vacant position. - String newInstance = newlyAddedInstances.pollFirst(); - instancesToSelect.set(i, newInstance); + if (numPartitions == 1) { + for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) { + List instancesInReplicaGroup = replicaGroupIdToInstancesMap.get(replicaGroupId); + if (replicaGroupId < existingNumReplicaGroups) { + List existingInstances = _existingInstancePartitions.getInstances(0, replicaGroupId); + LinkedHashSet candidateInstances = new LinkedHashSet<>(instancesInReplicaGroup); + List instances = + selectInstancesWithMinimumMovement(numInstancesPerReplicaGroup, candidateInstances, existingInstances); + LOGGER.info( + "Selecting instances: {} for replica-group: {}, partition: 0 for table: {}, existing instances: {}", + instances, replicaGroupId, _tableNameWithType, existingInstances); + instancePartitions.setInstances(0, replicaGroupId, instances); + } else { + LOGGER.info("Selecting instances: {} for replica-group: {}, partition: 0 for table: {}, " + + "there is no existing instances", instancesInReplicaGroup, replicaGroupId, _tableNameWithType); + instancePartitions.setInstances(0, replicaGroupId, instancesInReplicaGroup); + } + } + } else { + for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) { + List instancesInReplicaGroup = replicaGroupIdToInstancesMap.get(replicaGroupId); + if (replicaGroupId < existingNumReplicaGroups) { + int maxNumPartitionsPerInstance = (numInstancesPerReplicaGroup + numPartitions - 1) / numPartitions; + Map instanceToNumPartitionsMap = + Maps.newHashMapWithExpectedSize(numInstancesPerReplicaGroup); + for (String instance : instancesInReplicaGroup) { + instanceToNumPartitionsMap.put(instance, 0); + } + + List> partitionIdToInstancesMap = new ArrayList<>(numPartitions); + List> partitionIdToInstanceSetMap = new ArrayList<>(numPartitions); + List> partitionIdToExistingInstancesMap = new ArrayList<>(existingNumPartitions); + for (int partitionId = 0; partitionId < numPartitions; partitionId++) { + // Initialize the list with empty positions to fill + List instances = new ArrayList<>(numInstancesPerPartition); + for (int i = 0; i < numInstancesPerPartition; i++) { + instances.add(null); + } + partitionIdToInstancesMap.add(instances); + Set instanceSet = Sets.newHashSetWithExpectedSize(numInstancesPerPartition); + partitionIdToInstanceSetMap.add(instanceSet); + + // Keep the existing instances that are still alive + if (partitionId < existingNumPartitions) { + List existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId); + partitionIdToExistingInstancesMap.add(existingInstances); + int numInstancesToCheck = Math.min(numInstancesPerPartition, existingInstances.size()); + for (int i = 0; i < numInstancesToCheck; i++) { + String existingInstance = existingInstances.get(i); + Integer numPartitionsOnInstance = instanceToNumPartitionsMap.get(existingInstance); + if (numPartitionsOnInstance != null && numPartitionsOnInstance < maxNumPartitionsPerInstance) { + instances.set(i, existingInstance); + instanceSet.add(existingInstance); + instanceToNumPartitionsMap.put(existingInstance, numPartitionsOnInstance + 1); + } + } + } + } + + // Fill the vacant positions with instance that serves the least partitions + for (int partitionId = 0; partitionId < numPartitions; partitionId++) { + List instances = partitionIdToInstancesMap.get(partitionId); + Set instanceSet = partitionIdToInstanceSetMap.get(partitionId); + int numInstancesToFill = numInstancesPerPartition - instanceSet.size(); + if (numInstancesToFill > 0) { + // Triple stores (instance, numPartitionsOnInstance, instanceIndex) for sorting + List> triples = new ArrayList<>(numInstancesPerReplicaGroup); + for (int i = 0; i < numInstancesPerReplicaGroup; i++) { + String instance = instancesInReplicaGroup.get(i); + if (!instanceSet.contains(instance)) { + triples.add(Triple.of(instance, instanceToNumPartitionsMap.get(instance), i)); + } + } + triples.sort((o1, o2) -> { + int result = Integer.compare(o1.getMiddle(), o2.getMiddle()); + return result != 0 ? result : Integer.compare(o1.getRight(), o2.getRight()); + }); + int instanceIdToFill = 0; + for (int i = 0; i < numInstancesPerPartition; i++) { + if (instances.get(i) == null) { + String instance = triples.get(instanceIdToFill++).getLeft(); + instances.set(i, instance); + instanceToNumPartitionsMap.put(instance, instanceToNumPartitionsMap.get(instance) + 1); + } + } + } + + if (partitionId < existingNumPartitions) { + LOGGER.info( + "Selecting instances: {} for replica-group: {}, partition: {} for table: {}, existing instances: {}", + instances, replicaGroupId, partitionId, _tableNameWithType, + partitionIdToExistingInstancesMap.get(partitionId)); + } else { + LOGGER.info("Selecting instances: {} for replica-group: {}, partition: {} for table: {}, " + + "there is no existing instances", instances, replicaGroupId, partitionId, _tableNameWithType); + } + instancePartitions.setInstances(partitionId, replicaGroupId, instances); + } + } else { + // Assign consecutive instances within a replica-group to each partition + int instanceIdInReplicaGroup = 0; + for (int partitionId = 0; partitionId < numPartitions; partitionId++) { + List instances = new ArrayList<>(numInstancesPerPartition); + for (int i = 0; i < numInstancesPerPartition; i++) { + instances.add(instancesInReplicaGroup.get(instanceIdInReplicaGroup)); + instanceIdInReplicaGroup = (instanceIdInReplicaGroup + 1) % numInstancesPerReplicaGroup; + } + LOGGER.info("Selecting instances: {} for replica-group: {}, partition: {} for table: {}, " + + "there is no existing instances", instances, replicaGroupId, partitionId, _tableNameWithType); + instancePartitions.setInstances(partitionId, replicaGroupId, instances); + } } } } - return instancesToSelect; } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java index 5aefd1ad6986..28d58bbbcdd9 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java @@ -21,11 +21,16 @@ import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.Comparator; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; +import javax.annotation.Nullable; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.tuple.Triple; import org.apache.helix.model.InstanceConfig; +import org.apache.pinot.common.assignment.InstancePartitions; import org.apache.pinot.common.utils.config.InstanceUtils; import org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig; import org.slf4j.Logger; @@ -40,10 +45,15 @@ public class InstanceTagPoolSelector { private final InstanceTagPoolConfig _tagPoolConfig; private final String _tableNameWithType; + private final boolean _minimizeDataMovement; + private final InstancePartitions _existingInstancePartitions; - public InstanceTagPoolSelector(InstanceTagPoolConfig tagPoolConfig, String tableNameWithType) { + public InstanceTagPoolSelector(InstanceTagPoolConfig tagPoolConfig, String tableNameWithType, + boolean minimizeDataMovement, @Nullable InstancePartitions existingInstancePartitions) { _tagPoolConfig = tagPoolConfig; _tableNameWithType = tableNameWithType; + _minimizeDataMovement = minimizeDataMovement && existingInstancePartitions != null; + _existingInstancePartitions = existingInstancePartitions; } /** @@ -70,12 +80,14 @@ public Map> selectInstances(List i if (_tagPoolConfig.isPoolBased()) { // Pool based selection + Map instanceToPoolMap = new HashMap<>(); // Extract the pool information from the instance configs for (InstanceConfig instanceConfig : candidateInstanceConfigs) { Map poolMap = instanceConfig.getRecord().getMapField(InstanceUtils.POOL_KEY); if (poolMap != null && poolMap.containsKey(tag)) { int pool = Integer.parseInt(poolMap.get(tag)); poolToInstanceConfigsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(instanceConfig); + instanceToPoolMap.put(instanceConfig.getInstanceName(), pool); } } Preconditions.checkState(!poolToInstanceConfigsMap.isEmpty(), @@ -89,16 +101,15 @@ public Map> selectInstances(List i // Calculate the pools to select based on the selection config Set pools = poolToInstanceConfigsMap.keySet(); List poolsToSelect = _tagPoolConfig.getPools(); - if (poolsToSelect != null && !poolsToSelect.isEmpty()) { + if (!CollectionUtils.isEmpty(poolsToSelect)) { Preconditions.checkState(pools.containsAll(poolsToSelect), "Cannot find all instance pools configured: %s", poolsToSelect); } else { int numPools = poolToInstanceConfigsMap.size(); int numPoolsToSelect = _tagPoolConfig.getNumPools(); if (numPoolsToSelect > 0) { - Preconditions - .checkState(numPoolsToSelect <= numPools, "Not enough instance pools (%s in the cluster, asked for %s)", - numPools, numPoolsToSelect); + Preconditions.checkState(numPoolsToSelect <= numPools, + "Not enough instance pools (%s in the cluster, asked for %s)", numPools, numPoolsToSelect); } else { numPoolsToSelect = numPools; } @@ -110,10 +121,44 @@ public Map> selectInstances(List i } // Select pools based on the table name hash to evenly distribute the tables - poolsToSelect = new ArrayList<>(numPoolsToSelect); List poolsInCluster = new ArrayList<>(pools); - for (int i = 0; i < numPoolsToSelect; i++) { - poolsToSelect.add(poolsInCluster.get((tableNameHash + i) % numPools)); + int startIndex = Math.abs(tableNameHash % numPools); + poolsToSelect = new ArrayList<>(numPoolsToSelect); + if (_minimizeDataMovement) { + assert _existingInstancePartitions != null; + Map poolToNumExistingInstancesMap = new TreeMap<>(); + int existingNumPartitions = _existingInstancePartitions.getNumPartitions(); + int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups(); + for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) { + for (int replicaGroupId = 0; replicaGroupId < existingNumReplicaGroups; replicaGroupId++) { + List existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId); + for (String existingInstance : existingInstances) { + Integer existingPool = instanceToPoolMap.get(existingInstance); + if (existingPool != null) { + poolToNumExistingInstancesMap.merge(existingPool, 1, Integer::sum); + } + } + } + } + // Sort the pools based on the number of existing instances in the pool in descending order, then use the + // table name hash to break even + // Triple stores (pool, numExistingInstances, poolIndex) for sorting + List> triples = new ArrayList<>(numPools); + for (int i = 0; i < numPools; i++) { + int pool = poolsInCluster.get((startIndex + i) % numPools); + triples.add(Triple.of(pool, poolToNumExistingInstancesMap.getOrDefault(pool, 0), i)); + } + triples.sort((o1, o2) -> { + int result = Integer.compare(o2.getMiddle(), o1.getMiddle()); + return result != 0 ? result : Integer.compare(o1.getRight(), o2.getRight()); + }); + for (int i = 0; i < numPoolsToSelect; i++) { + poolsToSelect.add(triples.get(i).getLeft()); + } + } else { + for (int i = 0; i < numPoolsToSelect; i++) { + poolsToSelect.add(poolsInCluster.get((startIndex + i) % numPools)); + } } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/MirrorServerSetInstancePartitionSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/MirrorServerSetInstancePartitionSelector.java index 6b4086615a53..f273866eeb74 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/MirrorServerSetInstancePartitionSelector.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/MirrorServerSetInstancePartitionSelector.java @@ -76,8 +76,8 @@ public class MirrorServerSetInstancePartitionSelector extends InstancePartitionS public MirrorServerSetInstancePartitionSelector(InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig, String tableNameWithType, @Nullable InstancePartitions existingInstancePartitions, - InstancePartitions preConfiguredInstancePartitions) { - super(replicaGroupPartitionConfig, tableNameWithType, existingInstancePartitions); + InstancePartitions preConfiguredInstancePartitions, boolean minimizeDataMovement) { + super(replicaGroupPartitionConfig, tableNameWithType, existingInstancePartitions, minimizeDataMovement); _preConfiguredInstancePartitions = preConfiguredInstancePartitions; _numTargetInstancesPerReplicaGroup = _replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup(); _numTargetReplicaGroups = _replicaGroupPartitionConfig.getNumReplicaGroups(); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java index dedc79384edf..9feb8844c878 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java @@ -118,7 +118,7 @@ public void testInstanceAssignment() // Add OFFLINE instance assignment config to the offline table config InstanceAssignmentConfig offlineInstanceAssignmentConfig = new InstanceAssignmentConfig( new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(SERVER_TENANT_NAME), false, 0, null), null, - new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false, null)); + new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false, null), null, false); offlineTableConfig.setInstanceAssignmentConfigMap( Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), offlineInstanceAssignmentConfig)); _helixResourceManager.setExistingTableConfig(offlineTableConfig); @@ -136,7 +136,7 @@ public void testInstanceAssignment() // Add CONSUMING instance assignment config to the real-time table config InstanceAssignmentConfig consumingInstanceAssignmentConfig = new InstanceAssignmentConfig( new InstanceTagPoolConfig(TagNameUtils.getRealtimeTagForTenant(SERVER_TENANT_NAME), false, 0, null), null, - new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false, null)); + new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false, null), null, false); realtimeTableConfig.setInstanceAssignmentConfigMap( Collections.singletonMap(InstancePartitionsType.CONSUMING.toString(), consumingInstanceAssignmentConfig)); _helixResourceManager.setExistingTableConfig(realtimeTableConfig); @@ -164,7 +164,7 @@ public void testInstanceAssignment() null))); InstanceAssignmentConfig tierInstanceAssignmentConfig = new InstanceAssignmentConfig( new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(SERVER_TENANT_NAME), false, 0, null), null, - new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false, null)); + new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false, null), null, false); Map instanceAssignmentConfigMap = new HashMap<>(); instanceAssignmentConfigMap.put(InstancePartitionsType.OFFLINE.toString(), offlineInstanceAssignmentConfig); instanceAssignmentConfigMap.put(TIER_NAME, tierInstanceAssignmentConfig); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java index b25a529e101f..113d4e164965 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java @@ -50,7 +50,9 @@ import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.testng.annotations.Test; -import static org.testng.Assert.*; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.fail; public class InstanceAssignmentTest { @@ -198,8 +200,8 @@ public void testDefaultOfflineReplicaGroup() { // r0: [i8, i1, i4] // p0, p0, p1 // p1 - // r1: [i9, i10, i5] - // p0, p0, p1 + // r1: [i9, i5, i10] + // p0, p1, p0 // p1 // r2: [i0, i3, i11] // p0, p0, p1 @@ -217,7 +219,7 @@ public void testDefaultOfflineReplicaGroup() { assertEquals(instancePartitions.getInstances(1, 2), Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 0)); - // Add 2 more instances to the ZK and increase the number of instances per replica group from 2 to 3. + // Add 2 more instances to the ZK and increase the number of instances per partition from 2 to 3. for (int i = numInstances + 2; i < numInstances + 4; i++) { InstanceConfig instanceConfig = new InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i); instanceConfig.addTag(OFFLINE_TAG); @@ -233,34 +235,29 @@ public void testDefaultOfflineReplicaGroup() { // Math.abs("myTable_OFFLINE".hashCode()) % 12 = 2 // [i10, i11, i12, i13, i3, i4, i5, i11, i7, i8, i9, i0, i1] - // For r0, the candidate instances are [i12, i13, i4, i7, i8, i1]. - // For p0, since the existing assignment is [i8, i1], the next available instance from the candidates is i12. - // For p1, the existing assignment is [i4, i8], the next available instance is also i12. - // r0: [i12, i4, i8, i1] - // For r1, the candidate instances become [i10, i13, i5, i7, i9]. - // For p0, since the existing assignment is [i9, i10], the next available instance is i13 (new instance). - // For p1, the existing assignment is [i5, i9], the next available one from the candidates is i10, but since - // i10 is already used in the former partition, it got added to the tail, so the next available one is i13. - // r1: [i10, i13, i5, i9] - // For r2, the candidate instances become [i11, i3, i7, i0]. - // For p0, the existing assignment is [i0, i3], the next available instance from the candidates is i11. - // For p1, the existing assignment is [i11, i0], the next available instance from the candidates is i3, but - // since i3 is already used in the former partition, it got appended to the tail, so the next available one is i7. - // r2: [i11, i3, i7, i0] + // r0: [i8, i1, i4, i12] + // p0, p0, p1, p0 + // p1, p1 + // r1: [i9, i5, i10, i13] + // p0, p1, p0, p0 + // p1, p1 + // r2: [i0, i3, i11, i7] + // p0, p0, p1, p0 + // p1, p1 assertEquals(instancePartitions.getInstances(0, 0), Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 12)); assertEquals(instancePartitions.getInstances(1, 0), - Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 12)); + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 1)); assertEquals(instancePartitions.getInstances(0, 1), Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 10, SERVER_INSTANCE_ID_PREFIX + 13)); assertEquals(instancePartitions.getInstances(1, 1), - Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 13)); + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 10)); assertEquals(instancePartitions.getInstances(0, 2), - Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 11)); + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 7)); assertEquals(instancePartitions.getInstances(1, 2), - Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 7)); + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 3)); - // Reduce the number of instances per replica group from 3 to 2. + // Reduce the number of instances per partition from 3 to 2. numInstancesPerPartition = 2; tableConfig.getValidationConfig() .setReplicaGroupStrategyConfig(new ReplicaGroupStrategyConfig(partitionColumnName, numInstancesPerPartition)); @@ -374,7 +371,7 @@ public void testMirrorServerSetBasedRandomInner(int loopCount) throws FileNotFou TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, - InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString()))) + InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(), false))) .setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")) .build(); InstanceAssignmentDriver driver = new InstanceAssignmentDriver(tableConfig); @@ -480,7 +477,7 @@ public void testMirrorServerSetBased() { TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, - InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString()))) + InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(), false))) .setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build(); InstanceAssignmentDriver driver = new InstanceAssignmentDriver(tableConfig); InstancePartitions preConfigured = new InstancePartitions("preConfigured"); @@ -561,7 +558,7 @@ public void testMirrorServerSetBased() { tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, - InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString()))) + InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(), false))) .setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build(); driver = new InstanceAssignmentDriver(tableConfig); preConfigured = new InstancePartitions("preConfigured"); @@ -664,7 +661,7 @@ public void testMirrorServerSetBased() { tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, - InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString()))) + InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(), false))) .setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build(); driver = new InstanceAssignmentDriver(tableConfig); preConfigured = new InstancePartitions("preConfigured"); @@ -756,7 +753,7 @@ public void testMirrorServerSetBased() { tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, - InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString()))) + InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(), false))) .setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build(); driver = new InstanceAssignmentDriver(tableConfig); preConfigured = new InstancePartitions("preConfigured"); @@ -851,7 +848,7 @@ public void testMirrorServerSetBased() { tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, - InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString()))) + InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(), false))) .setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build(); driver = new InstanceAssignmentDriver(tableConfig); preConfigured = new InstancePartitions("preConfigured"); @@ -956,7 +953,7 @@ public void testMirrorServerSetBased() { tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, - InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString()))) + InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(), false))) .setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build(); driver = new InstanceAssignmentDriver(tableConfig); preConfigured = new InstancePartitions("preConfigured"); @@ -1063,7 +1060,7 @@ public void testMirrorServerSetBased() { tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, - InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString()))) + InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(), false))) .setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build(); driver = new InstanceAssignmentDriver(tableConfig); preConfigured = new InstancePartitions("preConfigured"); @@ -1156,7 +1153,7 @@ public void testMirrorServerSetBased() { tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, - InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString()))) + InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(), false))) .setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build(); driver = new InstanceAssignmentDriver(tableConfig); @@ -1230,7 +1227,7 @@ public void testMirrorServerSetBased() { tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, - InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString()))) + InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(), false))) .setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build(); driver = new InstanceAssignmentDriver(tableConfig); @@ -1311,7 +1308,7 @@ public void testPoolBased() { new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, false, null); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), - new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig))).build(); + new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, null, false))).build(); InstanceAssignmentDriver driver = new InstanceAssignmentDriver(tableConfig); // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0 @@ -1364,7 +1361,7 @@ public void testPoolBased() { // Select all 3 pools in pool selection tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null); tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), - new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig))); + new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, null, false))); // Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2 // All instances in pool 2 should be assigned to replica-group 0, and all instances in pool 0 should be assigned to @@ -1386,7 +1383,7 @@ public void testPoolBased() { // Select pool 0 and 1 in pool selection tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0, Arrays.asList(0, 1)); tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), - new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig))); + new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, null, false))); // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0 // All instances in pool 0 should be assigned to replica-group 0, and all instances in pool 1 should be assigned to @@ -1408,7 +1405,7 @@ public void testPoolBased() { numReplicaGroups = numPools; replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, false, null); tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), - new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig))); + new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, null, false))); // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0 // [pool0, pool1] @@ -1438,7 +1435,7 @@ public void testPoolBased() { replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, true, null); tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null); tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), - new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig))); + new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, null, true))); // Reset the instance configs to have only two pools. instanceConfigs.clear(); numInstances = 10; @@ -1487,7 +1484,7 @@ public void testPoolBased() { // Select pool 0 and 1 in pool selection tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0, Arrays.asList(0, 1)); tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), - new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig))); + new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, null, true))); // Get the latest existingInstancePartitions from last computation. existingInstancePartitions = instancePartitions; @@ -1514,7 +1511,7 @@ public void testPoolBased() { numReplicaGroups = 3; replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, true, null); tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), - new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig))); + new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, null, true))); // Get the latest existingInstancePartitions from last computation. existingInstancePartitions = instancePartitions; @@ -1593,7 +1590,7 @@ public void testPoolBased() { numReplicaGroups = 2; replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, true, null); tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), - new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig))); + new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, null, true))); // Get the latest existingInstancePartitions from last computation. existingInstancePartitions = instancePartitions; @@ -1693,6 +1690,109 @@ public void testPoolBased() { assertEquals(instancePartitions.getInstances(0, 1), Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 7, SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 6)); + + // The below is the test suite for testing out minimizeDataMovement with pool configs + // Add the third pool with same number of instances but keep number of pools the same (i.e. 2) + numPools = 3; + numInstances = numPools * numInstancesPerPool; + for (int i = numInstances + 4; i < numInstances + 9; i++) { + InstanceConfig instanceConfig = new InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i); + instanceConfig.addTag(OFFLINE_TAG); + int pool = numPools - 1; + instanceConfig.getRecord() + .setMapField(InstanceUtils.POOL_KEY, Collections.singletonMap(OFFLINE_TAG, Integer.toString(pool))); + instanceConfigs.add(instanceConfig); + } + + // Get the latest existingInstancePartitions from last computation. + existingInstancePartitions = instancePartitions; + + // Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2, but since minimizeDataMovement is enabled, + // same pools would be re-used. + // [pool0, pool1] + // r0 r1 + // Thus, the instance partition assignment remains the same as the previous one. + // pool 0: [ i12, i4, i0, i1, i10 ] + // pool 1: [ i7, i9, i11, i13, i6 ] + instancePartitions = + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingInstancePartitions); + assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups); + assertEquals(instancePartitions.getNumPartitions(), 1); + assertEquals(instancePartitions.getInstances(0, 0), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 12, SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 0, + SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 10)); + assertEquals(instancePartitions.getInstances(0, 1), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 7, SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 11, + SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 6)); + + // Set tag pool config to 3. + tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null); + tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), + new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, null, true))); + + // Get the latest existingInstancePartitions from last computation. + existingInstancePartitions = instancePartitions; + + // Putting the existingPoolToInstancesMap shouldn't change the instance assignment, + // as there are only 2 replica groups needed. + // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0 + // Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2 + // But since Pool 0 and Pool 1 is already being used for the table, the numReplica remains at 2, + // so the 3rd pool (Pool 2) won't be picked up. + // Thus, the instance partition assignment remains the same as the existing one. + // All instances in pool 0 should be assigned to replica-group 0, and all instances in pool 1 should be assigned to + // replica-group 1 + // Now in poolToInstancesMap: + // pool 0: [ i12, i4, i0, i1, i10 ] + // pool 1: [ i7, i9, i11, i13, i6 ] + instancePartitions = + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingInstancePartitions); + assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups); + assertEquals(instancePartitions.getNumPartitions(), 1); + assertEquals(instancePartitions.getInstances(0, 0), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 12, SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 0, + SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 10)); + assertEquals(instancePartitions.getInstances(0, 1), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 7, SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 11, + SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 6)); + + // Set replica group from 2 to 3 + numReplicaGroups = 3; + replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, true, null); + tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), + new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, null, true))); + + // Get the latest existingInstancePartitions from last computation. + existingInstancePartitions = instancePartitions; + + // Now that 1 more replica group is needed, Pool 2 will be chosen for the 3rd replica group + // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0 + // Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2 + // [pool0, pool1, pool2] + // r0 r1 r2 + // Each replica-group should have 2 instances assigned + // Math.abs("myTable_OFFLINE".hashCode()) % 5 = 3 + // Latest instances from ZK: + // pool 0: [ i3, i4, i0, i1, i2 ] + // pool 1: [ i8, i9, i5, i6, i7 ] + // pool 2: [ i22,i23,i19,i20,i21] + // Thus, the new assignment will become: + // pool 0: [ i12, i4, i0, i1, i10 ] + // pool 1: [ i7, i9, i11, i13, i6 ] + // pool 2: [ i22, i23, i19, i20,i21 ] + instancePartitions = + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingInstancePartitions); + assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups); + assertEquals(instancePartitions.getNumPartitions(), 1); + assertEquals(instancePartitions.getInstances(0, 0), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 12, SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 0, + SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 10)); + assertEquals(instancePartitions.getInstances(0, 1), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 7, SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 11, + SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 6)); + assertEquals(instancePartitions.getInstances(0, 2), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 22, SERVER_INSTANCE_ID_PREFIX + 23, SERVER_INSTANCE_ID_PREFIX + 19, + SERVER_INSTANCE_ID_PREFIX + 20, SERVER_INSTANCE_ID_PREFIX + 21)); } @Test @@ -1720,7 +1820,7 @@ public void testIllegalConfig() { InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false, null); tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), - new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig))); + new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false))); // No instance with correct tag try { @@ -1750,7 +1850,7 @@ public void testIllegalConfig() { // Enable pool tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0, null); tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), - new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig))); + new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false))); // No instance has correct pool configured try { @@ -1784,7 +1884,7 @@ public void testIllegalConfig() { tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 3, null); tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), - new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig))); + new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false))); // Ask for too many pools try { @@ -1796,7 +1896,7 @@ public void testIllegalConfig() { tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0, Arrays.asList(0, 2)); tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), - new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig))); + new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false))); // Ask for pool that does not exist try { @@ -1810,7 +1910,7 @@ public void testIllegalConfig() { replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(false, 6, 0, 0, 0, 0, false, null ); tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), - new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig))); + new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false))); // Ask for too many instances try { @@ -1824,7 +1924,7 @@ public void testIllegalConfig() { replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 0, 0, 0, 0, false, null ); tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), - new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig))); + new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false))); // Number of replica-groups must be positive try { @@ -1836,7 +1936,7 @@ public void testIllegalConfig() { replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 11, 0, 0, 0, false, null); tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), - new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig))); + new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false))); // Ask for too many replica-groups try { @@ -1849,7 +1949,7 @@ public void testIllegalConfig() { replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 3, 3, 0, 0, false, null); tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), - new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig))); + new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false))); // Ask for too many instances try { @@ -1861,7 +1961,7 @@ public void testIllegalConfig() { replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 3, 2, 0, 3, false, null); tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), - new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig))); + new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false))); // Ask for too many instances per partition try { @@ -1874,7 +1974,7 @@ public void testIllegalConfig() { replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 3, 2, 0, 0, false, null); tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), - new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig))); + new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false))); // Math.abs("myTable_OFFLINE".hashCode()) % 5 = 3 // pool0: [i3, i4, i0, i1, i2] @@ -1914,7 +2014,8 @@ public void testIllegalConfig() { try { tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), - new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, "ILLEGAL_SELECTOR"))).build(); + new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, "ILLEGAL_SELECTOR", false))) + .build(); } catch (IllegalArgumentException e) { assertEquals(e.getMessage(), "No enum constant org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig.PartitionSelector" @@ -1943,7 +2044,8 @@ public void testIllegalConfig() { tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap( Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, - InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))).build(); + InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), false))) + .build(); driver = new InstanceAssignmentDriver(tableConfig); try { instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null); @@ -1976,7 +2078,8 @@ public void testIllegalConfig() { tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap( Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, - InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))).build(); + InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), false))) + .build(); driver = new InstanceAssignmentDriver(tableConfig); try { instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null); @@ -2017,7 +2120,8 @@ public void testIllegalConfig() { tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap( Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, - InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))).build(); + InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), false))) + .build(); driver = new InstanceAssignmentDriver(tableConfig); try { instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null); @@ -2055,7 +2159,8 @@ public void testPoolBasedFDAware() { TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, - InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))).build(); + InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), false))) + .build(); InstanceAssignmentDriver driver = new InstanceAssignmentDriver(tableConfig); InstancePartitions instancePartitions = @@ -2127,7 +2232,8 @@ public void testPoolBasedFDAware() { tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap( Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, - InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))).build(); + InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), true))) + .build(); driver = new InstanceAssignmentDriver(tableConfig); // existingInstancePartitions = instancePartitions instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, instancePartitions); @@ -2208,7 +2314,7 @@ public void testPoolBasedFDAware() { tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap( Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, - InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))) + InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), false))) .setReplicaGroupStrategyConfig(new ReplicaGroupStrategyConfig(partitionColumnName, numInstancesPerReplicaGroup)) .setSegmentPartitionConfig(segmentPartitionConfig).build(); driver = new InstanceAssignmentDriver(tableConfig); @@ -2282,7 +2388,7 @@ public void testPoolBasedFDAware() { new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME + TABLE_NAME_ZERO_HASH_COMPLEMENT) .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, instanceConstraintConfig, replicaPartitionConfig, - InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))) + InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), false))) .build(); driver = new InstanceAssignmentDriver(tableConfig); @@ -2338,7 +2444,7 @@ public void testPoolBasedFDAware() { new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME + TABLE_NAME_ZERO_HASH_COMPLEMENT) .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, instanceConstraintConfig, replicaPartitionConfig, - InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))) + InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), true))) .build(); driver = new InstanceAssignmentDriver(tableConfig); @@ -2405,7 +2511,7 @@ public void testPoolBasedFDAware() { new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME + TABLE_NAME_ZERO_HASH_COMPLEMENT) .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, instanceConstraintConfig, replicaPartitionConfig, - InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))) + InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), false))) .build(); driver = new InstanceAssignmentDriver(tableConfig); @@ -2471,7 +2577,7 @@ public void testPoolBasedFDAware() { new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME + TABLE_NAME_ZERO_HASH_COMPLEMENT) .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, instanceConstraintConfig, replicaPartitionConfig, - InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))) + InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), true))) .build(); driver = new InstanceAssignmentDriver(tableConfig); @@ -2542,7 +2648,7 @@ public void testPoolBasedFDAware() { new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME + TABLE_NAME_ZERO_HASH_COMPLEMENT) .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, instanceConstraintConfig, replicaPartitionConfig, - InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))) + InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), false))) .build(); driver = new InstanceAssignmentDriver(tableConfig); @@ -2593,7 +2699,7 @@ public void testPoolBasedFDAware() { new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME + TABLE_NAME_ZERO_HASH_COMPLEMENT) .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, instanceConstraintConfig, replicaPartitionConfig, - InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))) + InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), false))) .build(); driver = new InstanceAssignmentDriver(tableConfig); @@ -2657,7 +2763,7 @@ public void testPoolBasedFDAware() { new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME + TABLE_NAME_ZERO_HASH_COMPLEMENT) .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), new InstanceAssignmentConfig(tagPoolConfig, instanceConstraintConfig, replicaPartitionConfig, - InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))) + InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), true))) .build(); driver = new InstanceAssignmentDriver(tableConfig); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java index 889206437fe6..288b789aee39 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java @@ -31,45 +31,79 @@ import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig; import org.testng.annotations.Test; +import static org.testng.Assert.assertEquals; + public class InstanceReplicaGroupPartitionSelectorTest { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + //@formatter:off private static final String INSTANCE_CONFIG_TEMPLATE = - "{\n" + " \"id\": \"Server_pinot-server-${serverName}.pinot-server-headless.pinot.svc.cluster.local_8098\",\n" - + " \"simpleFields\": {\n" + " \"HELIX_ENABLED\": \"true\",\n" - + " \"HELIX_ENABLED_TIMESTAMP\": \"1688959934305\",\n" - + " \"HELIX_HOST\": \"pinot-server-${serverName}.pinot-server-headless.pinot.svc.cluster.local\",\n" - + " \"HELIX_PORT\": \"8098\",\n" + " \"adminPort\": \"8097\",\n" + " \"grpcPort\": \"8090\",\n" - + " \"queryMailboxPort\": \"46347\",\n" + " \"queryServerPort\": \"45031\",\n" - + " \"shutdownInProgress\": \"false\"\n" + " },\n" + " \"mapFields\": {\n" - + " \"SYSTEM_RESOURCE_INFO\": {\n" + " \"numCores\": \"16\",\n" - + " \"totalMemoryMB\": \"126976\",\n" + " \"maxHeapSizeMB\": \"65536\"\n" + " },\n" - + " \"pool\": {\n" + " \"DefaultTenant_OFFLINE\": \"${pool}\",\n" - + " \"${poolName}\": \"${pool}\",\n" + " \"AllReplicationGroups\": \"1\"\n" + " }\n" + " },\n" - + " \"listFields\": {\n" + " \"TAG_LIST\": [\n" + " \"DefaultTenant_OFFLINE\",\n" - + " \"DefaultTenant_REALTIME\",\n" + " \"${poolName}\",\n" + " \"AllReplicationGroups\"\n" - + " ]\n" + " }\n" + "}"; + "{\n" + + " \"id\": \"Server_pinot-server-${serverName}.pinot-server-headless.pinot.svc.cluster.local_8098\",\n" + + " \"simpleFields\": {\n" + + " \"HELIX_ENABLED\": \"true\",\n" + + " \"HELIX_ENABLED_TIMESTAMP\": \"1688959934305\",\n" + + " \"HELIX_HOST\": \"pinot-server-${serverName}.pinot-server-headless.pinot.svc.cluster.local\",\n" + + " \"HELIX_PORT\": \"8098\",\n" + + " \"adminPort\": \"8097\",\n" + + " \"grpcPort\": \"8090\",\n" + + " \"queryMailboxPort\": \"46347\",\n" + + " \"queryServerPort\": \"45031\",\n" + + " \"shutdownInProgress\": \"false\"\n" + + " },\n" + + " \"mapFields\": {\n" + + " \"SYSTEM_RESOURCE_INFO\": {\n" + + " \"numCores\": \"16\",\n" + + " \"totalMemoryMB\": \"126976\",\n" + + " \"maxHeapSizeMB\": \"65536\"\n" + + " },\n" + + " \"pool\": {\n" + + " \"DefaultTenant_OFFLINE\": \"${pool}\",\n" + + " \"${poolName}\": \"${pool}\",\n" + + " \"AllReplicationGroups\": \"1\"\n" + + " }\n" + + " },\n" + + " \"listFields\": {\n" + + " \"TAG_LIST\": [\n" + + " \"DefaultTenant_OFFLINE\",\n" + + " \"DefaultTenant_REALTIME\",\n" + + " \"${poolName}\",\n" + + " \"AllReplicationGroups\"\n" + + " ]\n" + + " }\n" + + "}"; + //@formatter:on @Test - public void testSelectInstances() throws JsonProcessingException { - ObjectMapper objectMapper = new ObjectMapper(); + public void testPoolsWhenOneMorePoolAddedAndOneMoreReplicaGroupsNeeded() + throws JsonProcessingException { + //@formatter:off String existingPartitionsJson = - " {\n" + " \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n" - + " \"partitionToInstancesMap\": {\n" + " \"0_0\": [\n" - + " \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n" - + " \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n" - + " ]\n" + " }\n" + " }\n"; - InstancePartitions existing = objectMapper.readValue(existingPartitionsJson, InstancePartitions.class); + "{\n" + + " \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n" + + " \"partitionToInstancesMap\": {\n" + + " \"0_0\": [\n" + + " \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n" + + " \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n" + + " ]\n" + + " }\n" + + "}"; + //@formatter:on + InstancePartitions existing = OBJECT_MAPPER.readValue(existingPartitionsJson, InstancePartitions.class); InstanceReplicaGroupPartitionConfig config = new InstanceReplicaGroupPartitionConfig(true, 0, 2, 2, 1, 2, true, null); InstanceReplicaGroupPartitionSelector selector = - new InstanceReplicaGroupPartitionSelector(config, "tableNameBlah", existing); + new InstanceReplicaGroupPartitionSelector(config, "tableNameBlah", existing, true); String[] serverNames = {"rg0-0", "rg0-1", "rg1-0", "rg1-1"}; String[] poolNumbers = {"0", "0", "1", "1"}; - String[] poolNames = {"FirstHalfReplicationGroups", "FirstHalfReplicationGroups", "SecondHalfReplicationGroups", - "SecondHalfReplicationGroups"}; + String[] poolNames = { + "FirstHalfReplicationGroups", "FirstHalfReplicationGroups", "SecondHalfReplicationGroups", + "SecondHalfReplicationGroups" + }; Map> poolToInstanceConfigsMap = new HashMap<>(); for (int i = 0; i < serverNames.length; i++) { @@ -81,24 +115,103 @@ public void testSelectInstances() throws JsonProcessingException { StringSubstitutor substitutor = new StringSubstitutor(valuesMap); String resolvedString = substitutor.replace(INSTANCE_CONFIG_TEMPLATE); - ZNRecord znRecord = objectMapper.readValue(resolvedString, ZNRecord.class); + ZNRecord znRecord = OBJECT_MAPPER.readValue(resolvedString, ZNRecord.class); + int poolNumber = Integer.parseInt(poolNumbers[i]); + poolToInstanceConfigsMap.computeIfAbsent(poolNumber, k -> new ArrayList<>()).add(new InstanceConfig(znRecord)); + } + InstancePartitions assignedPartitions = new InstancePartitions("0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE"); + selector.selectInstances(poolToInstanceConfigsMap, assignedPartitions); + + // Now that 1 more pool is added and 1 more RG is needed, a new set called "0_1" is generated, + // and the instances from Pool 1 are assigned to this new replica. + //@formatter:off + String expectedInstancePartitions = + "{\n" + + " \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n" + + " \"partitionToInstancesMap\": {\n" + + " \"0_0\": [\n" + + " \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n" + + " \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n" + + " ],\n" + + " \"0_1\": [\n" + + " \"Server_pinot-server-rg1-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n" + + " \"Server_pinot-server-rg1-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n" + + " ]\n" + + " }\n" + + "}"; + //@formatter:on + InstancePartitions expectedPartitions = + OBJECT_MAPPER.readValue(expectedInstancePartitions, InstancePartitions.class); + assertEquals(assignedPartitions, expectedPartitions); + } + + @Test + public void testSelectPoolsWhenExistingReplicaGroupMapsToMultiplePools() + throws JsonProcessingException { + // The "rg0-2" instance used to belong to Pool 1, but now it belongs to Pool 0. + //@formatter:off + String existingPartitionsJson = + "{\n" + + " \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n" + + " \"partitionToInstancesMap\": {\n" + + " \"0_0\": [\n" + + " \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n" + + " \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n" + + " ],\n" + + " \"0_1\": [\n" + + " \"Server_pinot-server-rg0-2.pinot-server-headless.pinot.svc.cluster.local_8098\",\n" + + " \"Server_pinot-server-rg1-0.pinot-server-headless.pinot.svc.cluster.local_8098\"\n" + + " ]\n" + + " }\n" + + "}"; + //@formatter:on + InstancePartitions existing = OBJECT_MAPPER.readValue(existingPartitionsJson, InstancePartitions.class); + InstanceReplicaGroupPartitionConfig config = + new InstanceReplicaGroupPartitionConfig(true, 0, 2, 2, 1, 2, true, null); + + InstanceReplicaGroupPartitionSelector selector = + new InstanceReplicaGroupPartitionSelector(config, "tableNameBlah", existing, true); + + String[] serverNames = {"rg0-0", "rg0-1", "rg0-2", "rg1-0", "rg1-1", "rg1-2"}; + String[] poolNumbers = {"0", "0", "0", "1", "1", "1"}; + Map> poolToInstanceConfigsMap = new HashMap<>(); + + for (int i = 0; i < serverNames.length; i++) { + Map valuesMap = new HashMap<>(); + valuesMap.put("serverName", serverNames[i]); + valuesMap.put("pool", poolNumbers[i]); + + StringSubstitutor substitutor = new StringSubstitutor(valuesMap); + String resolvedString = substitutor.replace(INSTANCE_CONFIG_TEMPLATE); + + ZNRecord znRecord = OBJECT_MAPPER.readValue(resolvedString, ZNRecord.class); int poolNumber = Integer.parseInt(poolNumbers[i]); poolToInstanceConfigsMap.computeIfAbsent(poolNumber, k -> new ArrayList<>()).add(new InstanceConfig(znRecord)); } + InstancePartitions assignedPartitions = new InstancePartitions("0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE"); selector.selectInstances(poolToInstanceConfigsMap, assignedPartitions); + // The "rg0-2" instance is replaced by "rg1-0" (which belongs to Pool 1), as "rg0-2" no longer belongs to Pool 1. + // And "rg1-0" remains the same position as it's always under Pool 1. + //@formatter:off String expectedInstancePartitions = - " {\n" + " \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n" - + " \"partitionToInstancesMap\": {\n" + " \"0_0\": [\n" - + " \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n" - + " \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n" - + " ],\n" + " \"0_1\": [\n" - + " \"Server_pinot-server-rg1-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n" - + " \"Server_pinot-server-rg1-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n" - + " ]\n" + " }\n" + " }\n"; + "{\n" + + " \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n" + + " \"partitionToInstancesMap\": {\n" + + " \"0_0\": [\n" + + " \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n" + + " \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n" + + " ],\n" + + " \"0_1\": [\n" + + " \"Server_pinot-server-rg1-1.pinot-server-headless.pinot.svc.cluster.local_8098\",\n" + + " \"Server_pinot-server-rg1-0.pinot-server-headless.pinot.svc.cluster.local_8098\"\n" + + " ]\n" + + " }\n" + + "}"; + //@formatter:on InstancePartitions expectedPartitions = - objectMapper.readValue(expectedInstancePartitions, InstancePartitions.class); - assert assignedPartitions.equals(expectedPartitions); + OBJECT_MAPPER.readValue(expectedInstancePartitions, InstancePartitions.class); + assertEquals(assignedPartitions, expectedPartitions); } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java index 5d679c03802d..1df7109ef288 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java @@ -195,7 +195,7 @@ public void testRebalance() InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, NUM_REPLICAS, 0, 0, 0, false, null); tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), - new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig))); + new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false))); _helixResourceManager.updateTableConfig(tableConfig); // No need to reassign instances because instances should be automatically assigned when updating the table config @@ -481,7 +481,7 @@ public void testRebalanceWithTiersAndInstanceAssignments() InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, NUM_REPLICAS, 0, 0, 0, false, null); tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(TIER_A_NAME, - new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig))); + new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false))); _helixResourceManager.updateTableConfig(tableConfig); rebalanceResult = tableRebalancer.rebalance(tableConfig, new RebalanceConfig(), null); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceAssignmentConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceAssignmentConfig.java index 391ba4812d3b..ad4b22ecaf70 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceAssignmentConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceAssignmentConfig.java @@ -41,13 +41,17 @@ public class InstanceAssignmentConfig extends BaseJsonConfig { "Configuration for the instance replica-group and partition of the instance assignment (mandatory)") private final InstanceReplicaGroupPartitionConfig _replicaGroupPartitionConfig; + @JsonPropertyDescription("Configuration to minimize data movement for pool and instance assignment") + private final boolean _minimizeDataMovement; + @JsonCreator public InstanceAssignmentConfig( @JsonProperty(value = "tagPoolConfig", required = true) InstanceTagPoolConfig tagPoolConfig, @JsonProperty("constraintConfig") @Nullable InstanceConstraintConfig constraintConfig, @JsonProperty(value = "replicaGroupPartitionConfig", required = true) InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig, - @JsonProperty("partitionSelector") @Nullable String partitionSelector) { + @JsonProperty("partitionSelector") @Nullable String partitionSelector, + @JsonProperty("minimizeDataMovement") boolean minimizeDataMovement) { Preconditions.checkArgument(tagPoolConfig != null, "'tagPoolConfig' must be configured"); Preconditions .checkArgument(replicaGroupPartitionConfig != null, "'replicaGroupPartitionConfig' must be configured"); @@ -57,11 +61,7 @@ public InstanceAssignmentConfig( _partitionSelector = partitionSelector == null ? PartitionSelector.INSTANCE_REPLICA_GROUP_PARTITION_SELECTOR : PartitionSelector.valueOf(partitionSelector); - } - - public InstanceAssignmentConfig(InstanceTagPoolConfig tagPoolConfig, InstanceConstraintConfig constraintConfig, - InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig) { - this(tagPoolConfig, constraintConfig, replicaGroupPartitionConfig, null); + _minimizeDataMovement = minimizeDataMovement; } public PartitionSelector getPartitionSelector() { @@ -81,6 +81,10 @@ public InstanceReplicaGroupPartitionConfig getReplicaGroupPartitionConfig() { return _replicaGroupPartitionConfig; } + public boolean isMinimizeDataMovement() { + return _minimizeDataMovement; + } + public enum PartitionSelector { FD_AWARE_INSTANCE_PARTITION_SELECTOR, INSTANCE_REPLICA_GROUP_PARTITION_SELECTOR, MIRROR_SERVER_SET_PARTITION_SELECTOR diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceReplicaGroupPartitionConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceReplicaGroupPartitionConfig.java index adc72e8f1ce1..1bc40cba212b 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceReplicaGroupPartitionConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceReplicaGroupPartitionConfig.java @@ -56,6 +56,8 @@ public class InstanceReplicaGroupPartitionConfig extends BaseJsonConfig { "Name of the column used for partition, if not provided table level replica group will be used") private final String _partitionColumn; + // TODO: remove this config in the next official release + @Deprecated private final boolean _minimizeDataMovement; @JsonCreator