Skip to content

Commit

Permalink
Discovery: wait on incoming joins before electing local node as master
Browse files Browse the repository at this point in the history
During master election each node pings in order to discover other nodes and validate the liveness of existing nodes. Based on this information the node either discovers an existing master or, if enough nodes are found (based on `discovery.zen.minimum_master_nodes>>) a new master will be elected.

Currently, the node that is elected as master will currently update it the cluster state to indicate the result of the election. Other nodes will submit a join request to the newly elected master node. Instead of immediately processing the election result, the elected master
node should wait for the incoming joins from other nodes, thus validating the elections result is properly applied. As soon as enough nodes have sent their joins request (based on the `minimum_master_nodes` settings) the cluster state is modified.

Note that if `minimum_master_nodes` is not set, this change has no effect.

Closes elastic#12161
  • Loading branch information
bleskes committed Jul 15, 2015
1 parent 4804bf7 commit 1e35bf3
Show file tree
Hide file tree
Showing 11 changed files with 1,078 additions and 156 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ long getRegisteredNextDelaySetting() {
}

// visible for testing
void performReroute(String reason) {
protected void performReroute(String reason) {
try {
if (lifecycle.stopped()) {
return;
Expand Down

Large diffs are not rendered by default.

163 changes: 35 additions & 128 deletions core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.elasticsearch.cluster.settings.DynamicSettings;
import org.elasticsearch.cluster.settings.Validator;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.inject.Inject;
Expand Down Expand Up @@ -93,6 +92,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
public final static String SETTING_MAX_PINGS_FROM_ANOTHER_MASTER = "discovery.zen.max_pings_from_another_master";
public final static String SETTING_SEND_LEAVE_REQUEST = "discovery.zen.send_leave_request";
public final static String SETTING_MASTER_ELECTION_FILTER_CLIENT = "discovery.zen.master_election.filter_client";
public final static String SETTING_MASTER_ELECTION_WAIT_FOR_JOINS_TIMEOUT = "discovery.zen.master_election.wait_for_joins_timeout";
public final static String SETTING_MASTER_ELECTION_FILTER_DATA = "discovery.zen.master_election.filter_data";

public static final String DISCOVERY_REJOIN_ACTION_NAME = "internal:discovery/zen/rejoin";
Expand Down Expand Up @@ -126,6 +126,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen

private final boolean masterElectionFilterClientNodes;
private final boolean masterElectionFilterDataNodes;
private final TimeValue masterElectionWaitForJoinsTimeout;


private final CopyOnWriteArrayList<InitialStateDiscoveryListener> initialStateListeners = new CopyOnWriteArrayList<>();
Expand All @@ -142,7 +143,9 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
@Nullable
private NodeService nodeService;

private final BlockingQueue<Tuple<DiscoveryNode, MembershipAction.JoinCallback>> processJoinRequests = ConcurrentCollections.newBlockingQueue();

// must initialized in doStart(), when we have the routingService set
private volatile NodeJoinController nodeJoinController;

@Inject
public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threadPool,
Expand All @@ -169,6 +172,7 @@ public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threa

this.masterElectionFilterClientNodes = settings.getAsBoolean(SETTING_MASTER_ELECTION_FILTER_CLIENT, true);
this.masterElectionFilterDataNodes = settings.getAsBoolean(SETTING_MASTER_ELECTION_FILTER_DATA, false);
this.masterElectionWaitForJoinsTimeout = settings.getAsTime(SETTING_MASTER_ELECTION_WAIT_FOR_JOINS_TIMEOUT, TimeValue.timeValueMillis(joinTimeout.millis() / 2));
this.rejoinOnMasterGone = settings.getAsBoolean(SETTING_REJOIN_ON_MASTER_GONE, true);

if (this.joinRetryAttempts < 1) {
Expand Down Expand Up @@ -230,6 +234,7 @@ protected void doStart() {
nodesFD.setLocalNode(clusterService.localNode());
joinThreadControl.start();
pingService.start();
this.nodeJoinController = new NodeJoinController(clusterService, routingService, discoverySettings, settings);

// start the join thread from a cluster state update. See {@link JoinThreadControl} for details.
clusterService.submitStateUpdateTask("initial_join", new ClusterStateNonMasterUpdateTask() {
Expand Down Expand Up @@ -353,6 +358,7 @@ public boolean joiningCluster() {
private void innerJoinCluster() {
DiscoveryNode masterNode = null;
final Thread currentThread = Thread.currentThread();
nodeJoinController.startAccumulatingJoins();
while (masterNode == null && joinThreadControl.joinThreadActive(currentThread)) {
masterNode = findMaster();
}
Expand All @@ -363,52 +369,32 @@ private void innerJoinCluster() {
}

if (clusterService.localNode().equals(masterNode)) {
clusterService.submitStateUpdateTask("zen-disco-join (elected_as_master)", Priority.IMMEDIATE, new ProcessedClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
// Take into account the previous known nodes, if they happen not to be available
// then fault detection will remove these nodes.

if (currentState.nodes().masterNode() != null) {
// TODO can we tie break here? we don't have a remote master cluster state version to decide on
logger.trace("join thread elected local node as master, but there is already a master in place: {}", currentState.nodes().masterNode());
return currentState;
}

DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder(currentState.nodes()).masterNodeId(currentState.nodes().localNode().id());
// update the fact that we are the master...
ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeGlobalBlock(discoverySettings.getNoMasterBlock()).build();
currentState = ClusterState.builder(currentState).nodes(builder).blocks(clusterBlocks).build();

// eagerly run reroute to remove dead nodes from routing table
RoutingAllocation.Result result = routingService.getAllocationService().reroute(currentState);
return ClusterState.builder(currentState).routingResult(result).build();
}

@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
}
final int requiredJoins = Math.max(0, electMaster.minimumMasterNodes() - 1); // we count as one
logger.debug("elected as master, waiting for incoming joins ([{}] needed)", requiredJoins);
nodeJoinController.waitToBeElectedAsMaster(requiredJoins, masterElectionWaitForJoinsTimeout,
new NodeJoinController.Callback() {
@Override
public void onElectedAsMaster(ClusterState state) {
joinThreadControl.markThreadAsDone(currentThread);
// we only starts nodesFD if we are master (it may be that we received a cluster state while pinging)
nodesFD.updateNodesAndPing(state); // start the nodes FD
sendInitialStateEventIfNeeded();
long count = clusterJoinsCounter.incrementAndGet();
logger.trace("cluster joins counter set to [{}] (elected as master)", count);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (newState.nodes().localNodeMaster()) {
// we only starts nodesFD if we are master (it may be that we received a cluster state while pinging)
joinThreadControl.markThreadAsDone(currentThread);
nodesFD.updateNodesAndPing(newState); // start the nodes FD
} else {
// if we're not a master it means another node published a cluster state while we were pinging
// make sure we go through another pinging round and actively join it
joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
@Override
public void onFailure(Throwable t) {
logger.trace("failed while waiting for nodes to join, rejoining", t);
joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
}
}
sendInitialStateEventIfNeeded();
long count = clusterJoinsCounter.incrementAndGet();
logger.trace("cluster joins counter set to [{}] (elected as master)", count);

}
});
);
} else {
// process any incoming joins (they will fail because we are not the master)
nodeJoinController.stopAccumulatingJoins();

// send join request
final boolean success = joinElectedMaster(masterNode);

Expand Down Expand Up @@ -878,7 +864,7 @@ static boolean shouldIgnoreOrRejectNewClusterState(ESLogger logger, ClusterState
}
if (!currentState.nodes().masterNodeId().equals(newClusterState.nodes().masterNodeId())) {
logger.warn("received a cluster state from a different master then the current one, rejecting (received {}, current {})", newClusterState.nodes().masterNode(), currentState.nodes().masterNode());
throw new IllegalStateException("cluster state from a different master then the current one, rejecting (received " + newClusterState.nodes().masterNode() + ", current " + currentState.nodes().masterNode() + ")");
throw new IllegalStateException("cluster state from a different master than the current one, rejecting (received " + newClusterState.nodes().masterNode() + ", current " + currentState.nodes().masterNode() + ")");
} else if (newClusterState.version() < currentState.version()) {
// if the new state has a smaller version, and it has the same master node, then no need to process it
logger.debug("received a cluster state that has a lower version than the current one, ignoring (received {}, current {})", newClusterState.version(), currentState.version());
Expand All @@ -893,6 +879,8 @@ void handleJoinRequest(final DiscoveryNode node, final MembershipAction.JoinCall
if (!transportService.addressSupported(node.address().getClass())) {
// TODO, what should we do now? Maybe inform that node that its crap?
logger.warn("received a wrong address type from [{}], ignoring...", node);
} else if (nodeJoinController == null) {
throw new IllegalStateException("discovery module is not yet started");
} else {
// The minimum supported version for a node joining a master:
Version minimumNodeJoinVersion = localNode().getVersion().minimumCompatibilityVersion();
Expand All @@ -910,88 +898,7 @@ void handleJoinRequest(final DiscoveryNode node, final MembershipAction.JoinCall
// validate the join request, will throw a failure if it fails, which will get back to the
// node calling the join request
membership.sendValidateJoinRequestBlocking(node, joinTimeout);
processJoinRequests.add(new Tuple<>(node, callback));
clusterService.submitStateUpdateTask("zen-disco-receive(join from node[" + node + "])", Priority.URGENT, new ProcessedClusterStateUpdateTask() {

private final List<Tuple<DiscoveryNode, MembershipAction.JoinCallback>> drainedJoinRequests = new ArrayList<>();
private boolean nodeAdded = false;

@Override
public ClusterState execute(ClusterState currentState) {
processJoinRequests.drainTo(drainedJoinRequests);
if (drainedJoinRequests.isEmpty()) {
return currentState;
}

DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(currentState.nodes());
for (Tuple<DiscoveryNode, MembershipAction.JoinCallback> task : drainedJoinRequests) {
DiscoveryNode node = task.v1();
if (currentState.nodes().nodeExists(node.id())) {
logger.debug("received a join request for an existing node [{}]", node);
} else {
nodeAdded = true;
nodesBuilder.put(node);
for (DiscoveryNode existingNode : currentState.nodes()) {
if (node.address().equals(existingNode.address())) {
nodesBuilder.remove(existingNode.id());
logger.warn("received join request from node [{}], but found existing node {} with same address, removing existing node", node, existingNode);
}
}
}
}


// we must return a new cluster state instance to force publishing. This is important
// for the joining node to finalize it's join and set us as a master
final ClusterState.Builder newState = ClusterState.builder(currentState);
if (nodeAdded) {
newState.nodes(nodesBuilder);
}

return newState.build();
}

@Override
public void onNoLongerMaster(String source) {
// we are rejected, so drain all pending task (execute never run)
processJoinRequests.drainTo(drainedJoinRequests);
Exception e = new NotMasterException("Node [" + clusterService.localNode() + "] not master for join request from [" + node + "]");
innerOnFailure(e);
}

void innerOnFailure(Throwable t) {
for (Tuple<DiscoveryNode, MembershipAction.JoinCallback> drainedTask : drainedJoinRequests) {
try {
drainedTask.v2().onFailure(t);
} catch (Exception e) {
logger.error("error during task failure", e);
}
}
}

@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
innerOnFailure(t);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (nodeAdded) {
// we reroute not in the same cluster state update since in certain areas we rely on
// the node to be in the cluster state (sampled from ClusterService#state) to be there, also
// shard transitions need to better be handled in such cases
routingService.reroute("post_node_add");
}
for (Tuple<DiscoveryNode, MembershipAction.JoinCallback> drainedTask : drainedJoinRequests) {
try {
drainedTask.v2().onSuccess();
} catch (Exception e) {
logger.error("unexpected error during [{}]", e, source);
}
}
}
});
nodeJoinController.handleJoinRequest(node, callback);
}
}

Expand Down Expand Up @@ -1404,4 +1311,4 @@ private void assertClusterStateThread() {
}

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public boolean hasReroutedAndClear() {
}

@Override
void performReroute(String reason) {
protected void performReroute(String reason) {
rerouted.set(true);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,12 +240,16 @@ public void failWithMinimumMasterNodesConfigured() throws Exception {
/** Verify that nodes fault detection works after master (re) election */
@Test
public void testNodesFDAfterMasterReelection() throws Exception {
startCluster(3);
startCluster(4);

logger.info("stopping current master");
logger.info("--> stopping current master");
internalCluster().stopCurrentMasterNode();

ensureStableCluster(2);
ensureStableCluster(3);

logger.info("--> reducing min master nodes to 2");
assertAcked(client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES, 2)).get());

String master = internalCluster().getMasterName();
String nonMaster = null;
Expand All @@ -259,7 +263,7 @@ public void testNodesFDAfterMasterReelection() throws Exception {
addRandomIsolation(nonMaster).startDisrupting();

logger.info("--> waiting for master to remove it");
ensureStableCluster(1, master);
ensureStableCluster(2, master);
}

/**
Expand Down Expand Up @@ -703,12 +707,13 @@ public void run() {
}

/**
* Test that a document which is indexed on the majority side of a partition, is available from the minory side,
* Test that a document which is indexed on the majority side of a partition, is available from the minority side,
* once the partition is healed
*
* @throws Exception
*/
@Test
@TestLogging(value = "cluster.service:TRACE")
public void testRejoinDocumentExistsInAllShardCopies() throws Exception {
List<String> nodes = startCluster(3);

Expand Down
Loading

0 comments on commit 1e35bf3

Please sign in to comment.