Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for deserilization bug in weighted round robin metadata #11679

Merged
merged 8 commits into from
Feb 12, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -105,6 +105,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Removed

### Fixed
- Fix for deserilization bug in weighted round-robin metadata ([#11679](https://github.com/opensearch-project/OpenSearch/pull/11679))

### Security

Original file line number Diff line number Diff line change
@@ -13,12 +13,14 @@
import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingResponse;
import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingResponse;
import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse;
import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.discovery.ClusterManagerNotDiscoveredException;
import org.opensearch.plugins.Plugin;
import org.opensearch.snapshots.mockstore.MockRepository;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.disruption.NetworkDisruption;
import org.opensearch.test.transport.MockTransportService;
@@ -715,4 +717,144 @@ public void testClusterHealthResponseWithEnsureNodeWeighedInParam() throws Excep
assertFalse(nodeLocalHealth.isTimedOut());
assertTrue(nodeLocalHealth.hasDiscoveredClusterManager());
}

public void testReadWriteWeightedRoutingMetadataOnNodeRestart() throws Exception {
anshu1106 marked this conversation as resolved.
Show resolved Hide resolved
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.build();

internalCluster().startClusterManagerOnlyNode(Settings.builder().put(commonSettings).build());

logger.info("--> starting 1 nodes on zones 'a' & 'b' & 'c'");
List<String> nodes_in_zone_a = internalCluster().startDataOnlyNodes(
1,
Settings.builder().put(commonSettings).put("node.attr.zone", "a").build()
);
List<String> nodes_in_zone_b = internalCluster().startDataOnlyNodes(
1,
Settings.builder().put(commonSettings).put("node.attr.zone", "b").build()
);
List<String> nodes_in_zone_c = internalCluster().startDataOnlyNodes(
1,
Settings.builder().put(commonSettings).put("node.attr.zone", "c").build()
);

logger.info("--> waiting for nodes to form a cluster");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("4").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));

ensureGreen();

logger.info("--> setting shard routing weights for weighted round robin");
Map<String, Double> weights = Map.of("a", 1.0, "b", 2.0, "c", 3.0);
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);

ClusterPutWeightedRoutingResponse response = client().admin()
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertEquals(response.isAcknowledged(), true);

ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().setVersion(0).get();
assertTrue(deleteResponse.isAcknowledged());

// check weighted routing metadata after node restart, ensure node comes healthy after restart
internalCluster().restartNode(nodes_in_zone_a.get(0), new InternalTestCluster.RestartCallback());
ensureGreen();
assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());

// make sure restarted node joins the cluster
assertEquals(3, internalCluster().clusterService().state().nodes().getDataNodes().size());
assertNotNull(
internalCluster().client(nodes_in_zone_a.get(0))
.admin()
.cluster()
.state(new ClusterStateRequest().local(true))
.get()
.getState()
.metadata()
.weightedRoutingMetadata()
);
assertNotNull(
internalCluster().client(nodes_in_zone_b.get(0))
.admin()
.cluster()
.state(new ClusterStateRequest().local(true))
.get()
.getState()
.metadata()
.weightedRoutingMetadata()
);
assertNotNull(
internalCluster().client(nodes_in_zone_c.get(0))
.admin()
.cluster()
.state(new ClusterStateRequest().local(true))
.get()
.getState()
.metadata()
.weightedRoutingMetadata()
);
assertNotNull(
internalCluster().client(internalCluster().getClusterManagerName())
.admin()
.cluster()
.state(new ClusterStateRequest().local(true))
.get()
.getState()
.metadata()
.weightedRoutingMetadata()
);

internalCluster().restartNode(internalCluster().getClusterManagerName(), new InternalTestCluster.RestartCallback());
ensureGreen();
assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());

// make sure restarted node joins the cluster
assertEquals(3, internalCluster().clusterService().state().nodes().getDataNodes().size());
assertNotNull(
internalCluster().client(nodes_in_zone_a.get(0))
.admin()
.cluster()
.state(new ClusterStateRequest().local(true))
.get()
.getState()
.metadata()
.weightedRoutingMetadata()
);
assertNotNull(
internalCluster().client(nodes_in_zone_b.get(0))
.admin()
.cluster()
.state(new ClusterStateRequest().local(true))
.get()
.getState()
.metadata()
.weightedRoutingMetadata()
);
assertNotNull(
internalCluster().client(nodes_in_zone_c.get(0))
.admin()
.cluster()
.state(new ClusterStateRequest().local(true))
.get()
.getState()
.metadata()
.weightedRoutingMetadata()
);
assertNotNull(
internalCluster().client(internalCluster().getClusterManagerName())
.admin()
.cluster()
.state(new ClusterStateRequest().local(true))
.get()
.getState()
.metadata()
.weightedRoutingMetadata()
);

}
}
Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

/**
* Contains metadata for weighted routing
@@ -99,7 +100,7 @@ public static NamedDiff<Metadata.Custom> readDiffFrom(StreamInput in) throws IOE
public static WeightedRoutingMetadata fromXContent(XContentParser parser) throws IOException {
String attrKey = null;
Double attrValue;
String attributeName = null;
String attributeName = "";
anshu1106 marked this conversation as resolved.
Show resolved Hide resolved
Map<String, Double> weights = new HashMap<>();
WeightedRouting weightedRouting;
XContentParser.Token token;
@@ -162,12 +163,12 @@ public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
WeightedRoutingMetadata that = (WeightedRoutingMetadata) o;
return weightedRouting.equals(that.weightedRouting);
return weightedRouting.equals(that.weightedRouting) && version == that.version;
}

@Override
public int hashCode() {
return weightedRouting.hashCode();
return Objects.hash(weightedRouting.hashCode(), version);
}

@Override
Original file line number Diff line number Diff line change
@@ -54,6 +54,7 @@ public boolean isSet() {

@Override
public void writeTo(StreamOutput out) throws IOException {

out.writeString(attributeName);
out.writeGenericValue(weights);
}
Original file line number Diff line number Diff line change
@@ -8,29 +8,60 @@

package org.opensearch.cluster.metadata;

import org.opensearch.cluster.ClusterModule;
import org.opensearch.cluster.Diff;
import org.opensearch.cluster.routing.WeightedRouting;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.test.AbstractXContentTestCase;
import org.opensearch.test.AbstractDiffableSerializationTestCase;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class WeightedRoutingMetadataTests extends AbstractXContentTestCase<WeightedRoutingMetadata> {
public class WeightedRoutingMetadataTests extends AbstractDiffableSerializationTestCase<Metadata.Custom> {

@Override
protected Writeable.Reader<Metadata.Custom> instanceReader() {
return WeightedRoutingMetadata::new;
}

@Override
protected WeightedRoutingMetadata createTestInstance() {
String attributeName = "zone";
Map<String, Double> weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0);
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);
if (randomBoolean()) {
weights = new HashMap<>();
attributeName = "";
}
WeightedRouting weightedRouting = new WeightedRouting(attributeName, weights);
WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting, -1);

return weightedRoutingMetadata;
}

@Override
protected NamedWriteableRegistry getNamedWriteableRegistry() {
return new NamedWriteableRegistry(ClusterModule.getNamedWriteables());
}

@Override
protected WeightedRoutingMetadata doParseInstance(XContentParser parser) throws IOException {
return WeightedRoutingMetadata.fromXContent(parser);
}

@Override
protected boolean supportsUnknownFields() {
return false;
protected Metadata.Custom makeTestChanges(Metadata.Custom testInstance) {

WeightedRouting weightedRouting = new WeightedRouting("", new HashMap<>());
WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting, -1);
return weightedRoutingMetadata;
}

@Override
protected Writeable.Reader<Diff<Metadata.Custom>> diffReader() {
return WeightedRoutingMetadata::readDiffFrom;
}

}