Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Signed-off-by: Gantigmaa Selenge <[email protected]>
  • Loading branch information
tinaselenge committed Jul 2, 2024
1 parent 1e29cff commit fb03839
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import java.util.Collections;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.TopicListing;

Expand Down Expand Up @@ -454,16 +455,19 @@ private static State observe(Reconciliation reconciliation, PlatformClient platf
}

enum Plan {
// Used for brokers that are initially healthy and require neither restart not reconfigure
// Used for nodes that are initially healthy and require neither restart not reconfigure
NOP,
// Used for brokers that are initially not healthy
RESTART_FIRST,
// Used in {@link #initialPlan(List, RollClient)} for brokers that require reconfigure
// Used for nodes that are initially not running
RESTART_NOT_RUNNING,
// Used for nodes that are not responsive to connections
RESTART_UNRESPONSIVE,
// Used in {@link #initialPlan(List, RollClient)} for nodes that may require reconfigure
// before we know whether the actual config changes are reconfigurable
MAYBE_RECONFIGURE,
// Used in {@link #refinePlanForReconfigurability(Reconciliation, KafkaVersion, Function, String, RollClient, Map)}
// once we know a MAYBE_RECONFIGURE node can actually be reconfigured
RECONFIGURE,
// Used for nodes that have non-empty reasons to restart
RESTART,
// Used in {@link #initialPlan(List, RollClient)} for nodes that require waiting for
// log recovery to complete
Expand Down Expand Up @@ -737,8 +741,13 @@ public List<Integer> loop() throws TimeoutException, InterruptedException, Execu
}

// Restart any initially unready nodes
if (!byPlan.getOrDefault(Plan.RESTART_FIRST, List.of()).isEmpty()) {
return restartUnReadyNodes(byPlan.get(Plan.RESTART_FIRST), totalNumOfControllerNodes);
if (!byPlan.getOrDefault(Plan.RESTART_NOT_RUNNING, List.of()).isEmpty()) {
return restartNotRunningNodes(byPlan.get(Plan.RESTART_NOT_RUNNING), totalNumOfControllerNodes);
}

// Restart any initially unready nodes
if (!byPlan.getOrDefault(Plan.RESTART_UNRESPONSIVE, List.of()).isEmpty()) {
return restartUnresponsiveNodes(byPlan.get(Plan.RESTART_UNRESPONSIVE));
}

// If we get this far we know all nodes are ready
Expand Down Expand Up @@ -895,30 +904,37 @@ private List<Integer> waitForUnreadyNodes(List<Context> contexts, boolean ignore
return contexts.stream().map(Context::nodeId).collect(Collectors.toList());
}

private List<Integer> restartUnReadyNodes(List<Context> contexts, int totalNumOfControllers) throws TimeoutException {
private List<Integer> restartNotRunningNodes(List<Context> contexts, int totalNumOfControllers) throws TimeoutException {
Set<Context> notRunningControllers = contexts.stream().filter(context -> context.currentRoles().controller()).collect(Collectors.toSet());

if (totalNumOfControllers > 1 && totalNumOfControllers == notRunningControllers.size()) {
LOGGER.warnCr(reconciliation, "None of the controller nodes are running, therefore restarting them all now to to give the best chance to recover and form a quorum");
restartInParallel(reconciliation, time, platformClient, rollClient, agentClient, notRunningControllers, postOperationTimeoutMs, maxRestarts);
return notRunningControllers.stream().map(Context::nodeId).toList();
}

Set<Context> notRunningAndHasOldRevision = contexts.stream().filter(context -> context.state().equals(State.NOT_RUNNING) &&
context.reason().contains(RestartReason.POD_HAS_OLD_REVISION)).collect(Collectors.toSet());

if (notRunningAndHasOldRevision.size() > 0) {
restartInParallel(reconciliation, time, platformClient, rollClient, agentClient, notRunningAndHasOldRevision, postOperationTimeoutMs, maxRestarts);
return notRunningAndHasOldRevision.stream().map(Context::nodeId).toList();
}

return waitForUnreadyNodes(contexts, false);
}

private List<Integer> restartUnresponsiveNodes(List<Context> contexts) throws TimeoutException {
Set<Context> pureControllerNodesToRestart = new HashSet<>();
Set<Context> combinedNodesToRestart = new HashSet<>();
var notRunningCombinedNodes = 0;

for (var context : contexts) {
if (context.currentRoles().controller() && !context.currentRoles().broker()) {
pureControllerNodesToRestart.add(context);
} else if (context.currentRoles().controller() && context.currentRoles().broker()) {
combinedNodesToRestart.add(context);
if (context.state().equals(State.NOT_RUNNING)) {
notRunningCombinedNodes++;
}
}
}

if (totalNumOfControllers > 1 && totalNumOfControllers == notRunningCombinedNodes) {
LOGGER.warnCr(reconciliation, "All controller nodes are combined and they are not running, therefore restarting them all now");
// if all controller nodes (except a single node quorum) are combined and all of them are not running e.g. Pending, we need to restart them all at the same time to form the quorum.
// This is because until the quorum has been formed and broker process can connect to it, the combined nodes do not become ready.
restartInParallel(reconciliation, time, platformClient, rollClient, agentClient, combinedNodesToRestart, postOperationTimeoutMs, maxRestarts);
return combinedNodesToRestart.stream().map(Context::nodeId).toList();
}

// restart in the following order: pure controllers, combined nodes and brokers
Context nodeToRestart;
if (pureControllerNodesToRestart.size() > 0) {
Expand All @@ -929,36 +945,18 @@ private List<Integer> restartUnReadyNodes(List<Context> contexts, int totalNumOf
nodeToRestart = contexts.get(0);
}

if (nodeToRestart.state() == State.NOT_RUNNING && !nodeToRestart.reason().contains(RestartReason.POD_HAS_OLD_REVISION)) {
// If the node is not running (e.g. unschedulable) then restarting it, likely won't make any difference.
// Proceeding and deleting another node may result in it not running too. Avoid restarting it unless it has an old revision.
LOGGER.warnCr(reconciliation, "Node {} has been already restarted but still not running. Therefore will not restart it", nodeToRestart);
} else {
restartNode(reconciliation, time, platformClient, nodeToRestart, maxRestarts);
}
restartInParallel(reconciliation, time, platformClient, rollClient, agentClient, Collections.singleton(nodeToRestart), postOperationTimeoutMs, maxRestarts);

return Collections.singletonList(nodeToRestart.nodeId());

try {
long remainingTimeoutMs = awaitState(reconciliation, time, platformClient, agentClient, nodeToRestart, State.SERVING, postOperationTimeoutMs);
if (nodeToRestart.currentRoles().broker()) {
awaitPreferred(reconciliation, time, rollClient, nodeToRestart, remainingTimeoutMs);
}
} catch (TimeoutException e) {
LOGGER.warnCr(reconciliation, "Timed out waiting for node {} to become ready after a restart", nodeToRestart.nodeRef());
if (nodeToRestart.numAttempts() >= maxAttempts) {
LOGGER.warnCr(reconciliation, "Reached the maximum attempt of waiting for node {} to become ready after a restart", nodeToRestart.nodeRef());
throw e;
}
nodeToRestart.incrementNumAttempts();
}
return List.of(nodeToRestart.nodeId());
}

private Map<Plan, List<Context>> initialPlan(List<Context> contexts, RollClient rollClient) {
return contexts.stream().collect(Collectors.groupingBy(context -> {
if (context.state() == State.NOT_RUNNING) {
LOGGER.debugCr(reconciliation, "{} is in {} state therefore may get restarted first", context.nodeRef(), context.state());
context.reason().add(RestartReason.POD_STUCK);
return Plan.RESTART_FIRST;
return Plan.RESTART_NOT_RUNNING;

} else if (context.state() == State.RECOVERING) {
LOGGER.debugCr(reconciliation, "{} is in log recovery therefore will not be restarted", context.nodeRef());
Expand All @@ -967,7 +965,7 @@ private Map<Plan, List<Context>> initialPlan(List<Context> contexts, RollClient
} else if (!rollClient.canConnectToNode(context.nodeRef(), context.currentRoles().controller())) {
LOGGER.debugCr(reconciliation, "{} will be restarted because it does not seem to responding to connection attempt", context.nodeRef());
context.reason().add(RestartReason.POD_UNRESPONSIVE);
return Plan.RESTART_FIRST;
return Plan.RESTART_UNRESPONSIVE;

} else {
var reasons = context.reason();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1457,16 +1457,44 @@ public void shouldRollNodesIfAllNotRunning() throws ExecutionException, Interrup
true,
3);

// the order we expect are pure controller, combined and broker only
assertNodesRestarted(platformClient, rollClient, nodeRefs, rr, 0);

assertNodesRestarted(platformClient, rollClient, nodeRefs, rr, 1);
// we expect the controller nodes to be restarted together as none of the controllers are running
assertNodesRestarted(platformClient, rollClient, nodeRefs, rr, 0, 1);

assertNodesRestarted(platformClient, rollClient, nodeRefs, rr, 2);

assertNodesRestarted(platformClient, rollClient, nodeRefs, rr);
}

@Test
public void shouldRollNodeIfNotRunningAndHasOldRevision() throws ExecutionException, InterruptedException, TimeoutException {
// given
PlatformClient platformClient = mock(PlatformClient.class);
RollClient rollClient = mock(RollClient.class);
AgentClient agentClient = mock(AgentClient.class);
var nodeRefs = new MockBuilder()
.addNodes(platformClient, true, true, 0)
.mockLeader(rollClient, 0)
.addTopic("topic-A", 0)
.mockCanConnectToNodes(rollClient, true, 0)
.mockNodeState(platformClient, List.of(PlatformClient.NodeState.NOT_RUNNING, PlatformClient.NodeState.READY), 0)
.mockTopics(rollClient)
.done();

var rr = newRollingRestart(platformClient,
rollClient,
agentClient,
nodeRefs.values(),
RackRollingTest::podHasOldRevision,
EMPTY_CONFIG_SUPPLIER,
true,
3);

// we expect the controller nodes to be restarted together as none of the controllers are running
assertNodesRestarted(platformClient, rollClient, nodeRefs, rr, 0);

assertNodesRestarted(platformClient, rollClient, nodeRefs, rr);
}

@Test
public void shouldRestartCombinedNodesIfAllNotRunning() throws ExecutionException, InterruptedException, TimeoutException {
// given
Expand Down Expand Up @@ -1508,17 +1536,15 @@ void shouldFailReconciliationIfControllerNodeNeverBecomeReady() {
RollClient rollClient = mock(RollClient.class);
AgentClient agentClient = mock(AgentClient.class);
var nodeRefs = new MockBuilder()
.addNode(platformClient, false, true, 0)
.addNode(platformClient, true, false, 1)
.mockNodeState(platformClient, List.of(PlatformClient.NodeState.NOT_RUNNING), 0)
.mockNodeState(platformClient, List.of(PlatformClient.NodeState.NOT_RUNNING), 1)
.mockTopics(rollClient)
.done();

var ex = assertThrows(TimeoutException.class,
var ex = assertThrows(RuntimeException.class,
() -> doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::noReasons, EMPTY_CONFIG_SUPPLIER, 1, 3));

assertEquals("Failed to reach SERVING within 120000 ms: Context[nodeRef=pool-kafka-1/1, currentRoles=NodeRoles[controller=true, broker=false], state=NOT_RUNNING, lastTransition=1970-01-01T00:00:00Z, reason=[POD_STUCK], numRestarts=0, numReconfigs=0, numAttempts=2]", ex.getMessage());
assertEquals("java.util.concurrent.TimeoutException: Failed to reach SERVING within 120000 ms: Context[nodeRef=pool-kafka-1/1, currentRoles=NodeRoles[controller=true, broker=false], state=NOT_RUNNING, lastTransition=1970-01-01T00:00:00Z, reason=[POD_STUCK], numRestarts=0, numReconfigs=0, numAttempts=2]", ex.getMessage());

Mockito.verify(rollClient, never()).reconfigureNode(any(), any(), any());
Mockito.verify(platformClient, times(0)).restartNode(eq(nodeRefs.get(1)), any());
Expand All @@ -1543,7 +1569,7 @@ void shouldFailReconciliationIfBrokerNodeNeverBecomeReady() {
var ex = assertThrows(TimeoutException.class,
() -> doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::noReasons, EMPTY_CONFIG_SUPPLIER, 1, 3));

assertEquals("Failed to reach SERVING within 120000 ms: Context[nodeRef=pool-kafka-0/0, currentRoles=NodeRoles[controller=false, broker=true], state=NOT_READY, lastTransition=1970-01-01T00:10:19Z, reason=[POD_STUCK], numRestarts=2, numReconfigs=0, numAttempts=2]", ex.getMessage());
assertEquals("Failed to reach SERVING within 120000 ms: Context[nodeRef=pool-kafka-0/0, currentRoles=NodeRoles[controller=false, broker=true], state=NOT_READY, lastTransition=1970-01-01T00:10:17Z, reason=[POD_STUCK], numRestarts=2, numReconfigs=0, numAttempts=2]", ex.getMessage());

Mockito.verify(rollClient, never()).reconfigureNode(any(), any(), any());
Mockito.verify(platformClient, times(1)).restartNode(eq(nodeRefs.get(1)), any());
Expand Down

0 comments on commit fb03839

Please sign in to comment.