Skip to content

Commit

Permalink
Make scheduling of remote accessible splits with addresses more strict
Browse files Browse the repository at this point in the history
UniformNodeSelector or FTE scheduler will schedule remote accessible splits
on selected nodes if such nodes are available and only fallback to other
nodes if preferred nodes are no longer part of cluster. Connector might have stale
node information when creating splits which could result in choosing offline nodes.
Additionally, in FTE mode nodes can go down so split addresses could no longer
be valid then task is restarted.

Additionally, this commit simplifies UniformNodeSelector optimizedLocalScheduling
which was hard to reason about and was not taking advantages of
recent improvements like adaptive split queue length.

Co-authored-by: Karol Sobczak <[email protected]>
  • Loading branch information
sopel39 committed Jun 11, 2024
1 parent c946e6d commit 2bedffb
Show file tree
Hide file tree
Showing 13 changed files with 255 additions and 506 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,19 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import com.google.common.collect.SetMultimap;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.log.Logger;
import io.trino.execution.NodeTaskMap;
import io.trino.execution.RemoteTask;
import io.trino.execution.resourcegroups.IndexedPriorityQueue;
import io.trino.execution.scheduler.NodeSchedulerConfig.SplitsBalancingPolicy;
import io.trino.metadata.InternalNode;
import io.trino.metadata.InternalNodeManager;
import io.trino.metadata.Split;
import io.trino.spi.HostAddress;
import io.trino.spi.SplitWeight;
import io.trino.spi.TrinoException;
import jakarta.annotation.Nullable;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -49,7 +41,6 @@
import java.util.function.Supplier;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.execution.scheduler.NodeScheduler.calculateLowWatermark;
import static io.trino.execution.scheduler.NodeScheduler.filterNodes;
import static io.trino.execution.scheduler.NodeScheduler.getAllNodes;
Expand All @@ -59,7 +50,6 @@
import static io.trino.execution.scheduler.NodeScheduler.selectNodes;
import static io.trino.execution.scheduler.NodeScheduler.toWhenHasSplitQueueSpaceFuture;
import static io.trino.spi.StandardErrorCode.NO_NODES_AVAILABLE;
import static java.util.Comparator.comparingLong;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.SECONDS;

Expand Down Expand Up @@ -168,47 +158,37 @@ public SplitPlacementResult computeAssignments(Set<Split> splits, List<RemoteTas
queueSizeAdjuster.update(existingTasks, assignmentStats);
Set<InternalNode> blockedExactNodes = new HashSet<>();
boolean splitWaitingForAnyNode = false;
// splitsToBeRedistributed becomes true only when splits go through locality-based assignment
boolean splitsToBeRedistributed = false;
Set<Split> remainingSplits = new HashSet<>(splits.size());

List<InternalNode> filteredNodes = filterNodes(nodeMap, includeCoordinator, ImmutableSet.of());
ResettableRandomizedIterator<InternalNode> randomCandidates = new ResettableRandomizedIterator<>(filteredNodes);
Set<InternalNode> schedulableNodes = new HashSet<>(filteredNodes);

// optimizedLocalScheduling enables prioritized assignment of splits to local nodes when splits contain locality information
if (optimizedLocalScheduling) {
for (Split split : splits) {
if (split.isRemotelyAccessible() && !split.getAddresses().isEmpty()) {
List<InternalNode> candidateNodes = selectExactNodes(nodeMap, split.getAddresses(), includeCoordinator);

Optional<InternalNode> chosenNode = candidateNodes.stream()
.filter(ownerNode -> assignmentStats.getTotalSplitsWeight(ownerNode) < maxSplitsWeightPerNode && assignmentStats.getUnacknowledgedSplitCountForStage(ownerNode) < maxUnacknowledgedSplitsPerTask)
.min(comparingLong(assignmentStats::getTotalSplitsWeight));

if (chosenNode.isPresent()) {
assignment.put(chosenNode.get(), split);
assignmentStats.addAssignedSplit(chosenNode.get(), split.getSplitWeight());
splitsToBeRedistributed = true;
continue;
}
}
remainingSplits.add(split);
}
}
else {
remainingSplits = splits;
}

for (Split split : remainingSplits) {
for (Split split : splits) {
randomCandidates.reset();

List<InternalNode> candidateNodes;
boolean exactNodes;
if (!split.isRemotelyAccessible()) {
candidateNodes = selectExactNodes(nodeMap, split.getAddresses(), includeCoordinator);
exactNodes = true;
}
else {
candidateNodes = selectNodes(minCandidates, randomCandidates);
// optimizedLocalScheduling enables prioritized assignment of splits to local nodes when splits contain locality information
if (optimizedLocalScheduling && !split.getAddresses().isEmpty()) {
candidateNodes = selectExactNodes(nodeMap, split.getAddresses(), includeCoordinator);
if (candidateNodes.isEmpty()) {
// choose any other node if preferred node is not available
candidateNodes = selectNodes(minCandidates, randomCandidates);
exactNodes = false;
}
else {
exactNodes = true;
}
}
else {
candidateNodes = selectNodes(minCandidates, randomCandidates);
exactNodes = false;
}
}
if (candidateNodes.isEmpty()) {
log.debug("No nodes available to schedule %s. Available nodes %s", split, nodeMap.getNodesByHost().keys());
Expand Down Expand Up @@ -238,7 +218,7 @@ public SplitPlacementResult computeAssignments(Set<Split> splits, List<RemoteTas
}
else {
candidateNodes.forEach(schedulableNodes::remove);
if (split.isRemotelyAccessible()) {
if (!exactNodes) {
splitWaitingForAnyNode = true;
}
// Exact node set won't matter, if a split is waiting for any node
Expand All @@ -261,9 +241,6 @@ else if (!splitWaitingForAnyNode) {
blocked = toWhenHasSplitQueueSpaceFuture(blockedExactNodes, existingTasks, calculateLowWatermark(minPendingSplitsWeightPerTask));
}

if (splitsToBeRedistributed) {
equateDistribution(assignment, assignmentStats, nodeMap, includeCoordinator);
}
return new SplitPlacementResult(blocked, assignment);
}

Expand Down Expand Up @@ -318,129 +295,6 @@ private List<InternalNode> getFreeNodesForStage(NodeAssignmentStats assignmentSt
return freeNodes.build();
}

/**
* The method tries to make the distribution of splits more uniform. All nodes are arranged into a maxHeap and a minHeap
* based on the number of splits that are assigned to them. Splits are redistributed, one at a time, from a maxNode to a
* minNode until we have as uniform a distribution as possible.
*
* @param assignment the node-splits multimap after the first and the second stage
* @param assignmentStats required to obtain info regarding splits assigned to a node outside the current batch of assignment
* @param nodeMap to get a list of all nodes to which splits can be assigned
*/
private void equateDistribution(Multimap<InternalNode, Split> assignment, NodeAssignmentStats assignmentStats, NodeMap nodeMap, boolean includeCoordinator)
{
if (assignment.isEmpty()) {
return;
}

Collection<InternalNode> allNodes = nodeMap.getNodesByHostAndPort().values().stream()
.filter(node -> includeCoordinator || !nodeMap.getCoordinatorNodeIds().contains(node.getNodeIdentifier()))
.collect(toImmutableList());

if (allNodes.size() < 2) {
return;
}

IndexedPriorityQueue<InternalNode> maxNodes = new IndexedPriorityQueue<>();
for (InternalNode node : assignment.keySet()) {
maxNodes.addOrUpdate(node, assignmentStats.getTotalSplitsWeight(node));
}

IndexedPriorityQueue<InternalNode> minNodes = new IndexedPriorityQueue<>();
for (InternalNode node : allNodes) {
minNodes.addOrUpdate(node, Long.MAX_VALUE - assignmentStats.getTotalSplitsWeight(node));
}

while (true) {
if (maxNodes.isEmpty()) {
return;
}

// fetch min and max node
InternalNode maxNode = maxNodes.poll();
InternalNode minNode = minNodes.poll();

// Allow some degree of non uniformity when assigning splits to nodes. Usually data distribution
// among nodes in a cluster won't be fully uniform (e.g. because hash function with non-uniform
// distribution is used like consistent hashing). In such case it makes sense to assign splits to nodes
// with data because of potential savings in network throughput and CPU time.
// The difference of 5 between node with maximum and minimum splits is a tradeoff between ratio of
// misassigned splits and assignment uniformity. Using larger numbers doesn't reduce the number of
// misassigned splits greatly (in absolute values).
if (assignmentStats.getTotalSplitsWeight(maxNode) - assignmentStats.getTotalSplitsWeight(minNode) <= SplitWeight.rawValueForStandardSplitCount(5)) {
return;
}

// move split from max to min
Split redistributed = redistributeSplit(assignment, maxNode, minNode, nodeMap.getNodesByHost());
assignmentStats.removeAssignedSplit(maxNode, redistributed.getSplitWeight());
assignmentStats.addAssignedSplit(minNode, redistributed.getSplitWeight());

// add max back into maxNodes only if it still has assignments
if (assignment.containsKey(maxNode)) {
maxNodes.addOrUpdate(maxNode, assignmentStats.getTotalSplitsWeight(maxNode));
}

// Add or update both the Priority Queues with the updated node priorities
maxNodes.addOrUpdate(minNode, assignmentStats.getTotalSplitsWeight(minNode));
minNodes.addOrUpdate(minNode, Long.MAX_VALUE - assignmentStats.getTotalSplitsWeight(minNode));
minNodes.addOrUpdate(maxNode, Long.MAX_VALUE - assignmentStats.getTotalSplitsWeight(maxNode));
}
}

/**
* The method selects and removes a split from the fromNode and assigns it to the toNode. There is an attempt to
* redistribute a Non-local split if possible. This case is possible when there are multiple queries running
* simultaneously. If a Non-local split cannot be found in the maxNode, any split is selected randomly and reassigned.
*/
@VisibleForTesting
public static Split redistributeSplit(Multimap<InternalNode, Split> assignment, InternalNode fromNode, InternalNode toNode, SetMultimap<InetAddress, InternalNode> nodesByHost)
{
Iterator<Split> splitIterator = assignment.get(fromNode).iterator();
Split splitToBeRedistributed = null;
while (splitIterator.hasNext()) {
Split split = splitIterator.next();
// Try to select non-local split for redistribution
if (!split.getAddresses().isEmpty() && !isSplitLocal(split.getAddresses(), fromNode.getHostAndPort(), nodesByHost)) {
splitToBeRedistributed = split;
break;
}
}
// Select any split if maxNode has no non-local splits in the current batch of assignment
if (splitToBeRedistributed == null) {
splitIterator = assignment.get(fromNode).iterator();
splitToBeRedistributed = splitIterator.next();
}
splitIterator.remove();
assignment.put(toNode, splitToBeRedistributed);
return splitToBeRedistributed;
}

/**
* Helper method to determine if a split is local to a node irrespective of whether splitAddresses contain port information or not
*/
private static boolean isSplitLocal(List<HostAddress> splitAddresses, HostAddress nodeAddress, SetMultimap<InetAddress, InternalNode> nodesByHost)
{
for (HostAddress address : splitAddresses) {
if (nodeAddress.equals(address)) {
return true;
}
InetAddress inetAddress;
try {
inetAddress = address.toInetAddress();
}
catch (UnknownHostException e) {
continue;
}
if (!address.hasPort()) {
Set<InternalNode> localNodes = nodesByHost.get(inetAddress);
return localNodes.stream()
.anyMatch(node -> node.getHostAndPort().equals(nodeAddress));
}
}
return false;
}

static class QueueSizeAdjuster
{
private static final long SCALE_DOWN_INTERVAL = SECONDS.toNanos(1);
Expand Down
Loading

0 comments on commit 2bedffb

Please sign in to comment.