Skip to content

Commit

Permalink
Merge branch 'main' into update.lucene.9.12.1
Browse files Browse the repository at this point in the history
Signed-off-by: Andriy Redko <[email protected]>
  • Loading branch information
reta authored Dec 16, 2024
2 parents bbc1381 + ef44e86 commit 68f7849
Show file tree
Hide file tree
Showing 12 changed files with 103 additions and 18 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add stats for remote publication failure and move download failure stats to remote methods([#16682](https://github.com/opensearch-project/OpenSearch/pull/16682/))
- Added a precaution to handle extreme date values during sorting to prevent `arithmetic_exception: long overflow` ([#16812](https://github.com/opensearch-project/OpenSearch/pull/16812)).
- Add search replica stats to segment replication stats API ([#16678](https://github.com/opensearch-project/OpenSearch/pull/16678))
- Introduce a setting to disable download of full cluster state from remote on term mismatch([#16798](https://github.com/opensearch-project/OpenSearch/pull/16798/))

### Dependencies
- Bump `com.google.cloud:google-cloud-core-http` from 2.23.0 to 2.47.0 ([#16504](https://github.com/opensearch-project/OpenSearch/pull/16504))
Expand All @@ -44,6 +45,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Bump `ch.qos.logback:logback-classic` from 1.2.13 to 1.5.12 ([#16716](https://github.com/opensearch-project/OpenSearch/pull/16716))
- Bump `com.azure:azure-identity` from 1.13.2 to 1.14.2 ([#16778](https://github.com/opensearch-project/OpenSearch/pull/16778))
- Bump Apache Lucene from 9.12.0 to 9.12.1 ([#16846](https://github.com/opensearch-project/OpenSearch/pull/16846))
- Bump `com.gradle.develocity` from 3.18.2 to 3.19 ([#16855](https://github.com/opensearch-project/OpenSearch/pull/16855))

### Changed
- Indexed IP field supports `terms_query` with more than 1025 IP masks [#16391](https://github.com/opensearch-project/OpenSearch/pull/16391)
Expand All @@ -69,6 +71,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Ensure consistency of system flag on IndexMetadata after diff is applied ([#16644](https://github.com/opensearch-project/OpenSearch/pull/16644))
- Skip remote-repositories validations for node-joins when RepositoriesService is not in sync with cluster-state ([#16763](https://github.com/opensearch-project/OpenSearch/pull/16763))
- Fix _list/shards API failing when closed indices are present ([#16606](https://github.com/opensearch-project/OpenSearch/pull/16606))
- Fix remote shards balance ([#15335](https://github.com/opensearch-project/OpenSearch/pull/15335))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ private GetTermVersionResponse buildResponse(GetTermVersionRequest request, Clus
ClusterStateTermVersion termVersion = new ClusterStateTermVersion(state);
if (discovery instanceof Coordinator) {
Coordinator coordinator = (Coordinator) discovery;
if (coordinator.isRemotePublicationEnabled()) {
if (coordinator.canDownloadFullStateFromRemote()) {
return new GetTermVersionResponse(termVersion, coordinator.isRemotePublicationEnabled());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1906,4 +1906,12 @@ public boolean isRemotePublicationEnabled() {
}
return false;
}

public boolean canDownloadFullStateFromRemote() {
if (remoteClusterStateService != null) {
return remoteClusterStateService.isRemotePublicationEnabled() && remoteClusterStateService.canDownloadFromRemoteForReadAPI();
}
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,17 @@ void balance() {
final Map<String, Integer> nodePrimaryShardCount = calculateNodePrimaryShardCount(remoteRoutingNodes);
int totalPrimaryShardCount = nodePrimaryShardCount.values().stream().reduce(0, Integer::sum);

totalPrimaryShardCount += routingNodes.unassigned().getNumPrimaries();
int avgPrimaryPerNode = (totalPrimaryShardCount + routingNodes.size() - 1) / routingNodes.size();
int unassignedRemotePrimaryShardCount = 0;
for (ShardRouting shard : routingNodes.unassigned()) {
if (RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shard, allocation)) && shard.primary()) {
unassignedRemotePrimaryShardCount++;
}
}
totalPrimaryShardCount += unassignedRemotePrimaryShardCount;
final int avgPrimaryPerNode = (totalPrimaryShardCount + remoteRoutingNodes.size() - 1) / remoteRoutingNodes.size();

ArrayDeque<RoutingNode> sourceNodes = new ArrayDeque<>();
ArrayDeque<RoutingNode> targetNodes = new ArrayDeque<>();
final ArrayDeque<RoutingNode> sourceNodes = new ArrayDeque<>();
final ArrayDeque<RoutingNode> targetNodes = new ArrayDeque<>();
for (RoutingNode node : remoteRoutingNodes) {
if (nodePrimaryShardCount.get(node.nodeId()) > avgPrimaryPerNode) {
sourceNodes.add(node);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,13 @@ public ClusterState state() {
return clusterState;
}

/**
* Returns true if the appliedClusterState is not null
*/
public boolean isStateInitialised() {
return this.state.get() != null;
}

/**
* Returns true if the appliedClusterState is not null
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,13 @@ public ClusterState state() {
return clusterApplierService.state();
}

/**
* Returns true if the state in appliedClusterState is not null
*/
public boolean isStateInitialised() {
return clusterApplierService.isStateInitialised();
}

/**
* The state that is persisted to store but may not be applied to cluster.
* @return ClusterState
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,8 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteClusterStateCleanupManager.REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING,
RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING,
RemoteClusterStateService.REMOTE_PUBLICATION_SETTING,
RemoteClusterStateService.REMOTE_STATE_DOWNLOAD_TO_SERVE_READ_API,

INDEX_METADATA_UPLOAD_TIMEOUT_SETTING,
GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING,
METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ public class RemoteClusterStateService implements Closeable {
* Gates the functionality of remote publication.
*/
public static final String REMOTE_PUBLICATION_SETTING_KEY = "cluster.remote_store.publication.enabled";
public static final String REMOTE_STATE_DOWNLOAD_TO_SERVE_READ_API_KEY = "cluster.remote_state.download.serve_read_api.enabled";

public static final Setting<Boolean> REMOTE_PUBLICATION_SETTING = Setting.boolSetting(
REMOTE_PUBLICATION_SETTING_KEY,
Expand All @@ -137,6 +138,13 @@ public class RemoteClusterStateService implements Closeable {
Property.Dynamic
);

public static final Setting<Boolean> REMOTE_STATE_DOWNLOAD_TO_SERVE_READ_API = Setting.boolSetting(
REMOTE_STATE_DOWNLOAD_TO_SERVE_READ_API_KEY,
true,
Property.NodeScope,
Property.Dynamic
);

/**
* Used to specify if cluster state metadata should be published to remote store
*/
Expand Down Expand Up @@ -235,6 +243,9 @@ public static RemoteClusterStateValidationMode parseString(String mode) {
+ "indices, coordination metadata updated : [{}], settings metadata updated : [{}], templates metadata "
+ "updated : [{}], custom metadata updated : [{}], indices routing updated : [{}]";
private volatile AtomicBoolean isPublicationEnabled;

private volatile AtomicBoolean downloadFromRemoteForReadAPI;

private final String remotePathPrefix;

private final RemoteClusterStateCache remoteClusterStateCache;
Expand Down Expand Up @@ -281,6 +292,8 @@ public RemoteClusterStateService(
&& RemoteStoreNodeAttribute.isRemoteRoutingTableConfigured(settings)
);
clusterSettings.addSettingsUpdateConsumer(REMOTE_PUBLICATION_SETTING, this::setRemotePublicationSetting);
this.downloadFromRemoteForReadAPI = new AtomicBoolean(clusterSettings.get(REMOTE_STATE_DOWNLOAD_TO_SERVE_READ_API));
clusterSettings.addSettingsUpdateConsumer(REMOTE_STATE_DOWNLOAD_TO_SERVE_READ_API, this::setRemoteDownloadForReadAPISetting);
this.remotePathPrefix = CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX.get(settings);
this.remoteRoutingTableService = RemoteRoutingTableServiceFactory.getService(
repositoriesService,
Expand Down Expand Up @@ -1124,6 +1137,14 @@ private void setRemotePublicationSetting(boolean remotePublicationSetting) {
}
}

private void setRemoteDownloadForReadAPISetting(boolean remoteDownloadForReadAPISetting) {
this.downloadFromRemoteForReadAPI.set(remoteDownloadForReadAPISetting);
}

public boolean canDownloadFromRemoteForReadAPI() {
return this.downloadFromRemoteForReadAPI.get();
}

// Package private for unit test
RemoteRoutingTableService getRemoteRoutingTableService() {
return this.remoteRoutingTableService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2994,7 +2994,12 @@ public String startVerification() {
*/
private BlobContainer testContainer(String seed) {
BlobPath testBlobPath;
if (prefixModeVerification == true) {

if (prefixModeVerification == true
&& (clusterService.isStateInitialised() == false
|| clusterService.state().nodes().getMinNodeVersion().onOrAfter(Version.V_2_17_0))) {
// During the remote store node bootstrap, the cluster state is not initialised
// Otherwise, the cluster state is initialised and available with the min node version information
PathInput pathInput = PathInput.builder().basePath(basePath()).indexUUID(seed).build();
testBlobPath = PathType.HASHED_PREFIX.path(pathInput, FNV_1A_COMPOSITE_1);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ public AllocationService createRemoteCapableAllocationService() {
}

public AllocationService createRemoteCapableAllocationService(String excludeNodes) {
Settings settings = Settings.builder().put("cluster.routing.allocation.exclude.node_id", excludeNodes).build();
Settings settings = Settings.builder().put("cluster.routing.allocation.exclude._id", excludeNodes).build();
return new MockAllocationService(
randomAllocationDeciders(settings, EMPTY_CLUSTER_SETTINGS, random()),
new TestGatewayAllocator(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,51 @@ public class RemoteShardsRebalanceShardsTests extends RemoteShardsBalancerBaseTe
* Post rebalance primaries should be balanced across all the nodes.
*/
public void testShardAllocationAndRebalance() {
int localOnlyNodes = 20;
int remoteCapableNodes = 40;
int localIndices = 40;
int remoteIndices = 80;
final int localOnlyNodes = 20;
final int remoteCapableNodes = 40;
final int halfRemoteCapableNodes = remoteCapableNodes / 2;
final int localIndices = 40;
final int remoteIndices = 80;
ClusterState clusterState = createInitialCluster(localOnlyNodes, remoteCapableNodes, localIndices, remoteIndices);
AllocationService service = this.createRemoteCapableAllocationService();
final StringBuilder excludeNodes = new StringBuilder();
for (int i = 0; i < halfRemoteCapableNodes; i++) {
excludeNodes.append(getNodeId(i, true));
if (i != (remoteCapableNodes / 2 - 1)) {
excludeNodes.append(", ");
}
}
AllocationService service = this.createRemoteCapableAllocationService(excludeNodes.toString());
clusterState = allocateShardsAndBalance(clusterState, service);
RoutingNodes routingNodes = clusterState.getRoutingNodes();
RoutingAllocation allocation = getRoutingAllocation(clusterState, routingNodes);

final Map<String, Integer> nodePrimariesCounter = getShardCounterPerNodeForRemoteCapablePool(clusterState, allocation, true);
final Map<String, Integer> nodeReplicaCounter = getShardCounterPerNodeForRemoteCapablePool(clusterState, allocation, false);
Map<String, Integer> nodePrimariesCounter = getShardCounterPerNodeForRemoteCapablePool(clusterState, allocation, true);
Map<String, Integer> nodeReplicaCounter = getShardCounterPerNodeForRemoteCapablePool(clusterState, allocation, false);
int avgPrimariesPerNode = getTotalShardCountAcrossNodes(nodePrimariesCounter) / remoteCapableNodes;

// Primary and replica are balanced post first reroute
// Primary and replica are balanced after first allocating unassigned
for (RoutingNode node : routingNodes) {
if (RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getNodePool(node))) {
if (Integer.parseInt(node.nodeId().split("-")[4]) < halfRemoteCapableNodes) {
assertEquals(0, (int) nodePrimariesCounter.getOrDefault(node.nodeId(), 0));
} else {
assertEquals(avgPrimariesPerNode * 2, (int) nodePrimariesCounter.get(node.nodeId()));
}
assertTrue(nodeReplicaCounter.getOrDefault(node.nodeId(), 0) >= 0);
}
}

// Remove exclude constraint and rebalance
service = this.createRemoteCapableAllocationService();
clusterState = allocateShardsAndBalance(clusterState, service);
routingNodes = clusterState.getRoutingNodes();
allocation = getRoutingAllocation(clusterState, routingNodes);
nodePrimariesCounter = getShardCounterPerNodeForRemoteCapablePool(clusterState, allocation, true);
nodeReplicaCounter = getShardCounterPerNodeForRemoteCapablePool(clusterState, allocation, false);
for (RoutingNode node : routingNodes) {
if (RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getNodePool(node))) {
assertInRange(nodePrimariesCounter.get(node.nodeId()), avgPrimariesPerNode, remoteCapableNodes - 1);
assertTrue(nodeReplicaCounter.get(node.nodeId()) >= 0);
assertEquals(avgPrimariesPerNode, (int) nodePrimariesCounter.get(node.nodeId()));
assertTrue(nodeReplicaCounter.getOrDefault(node.nodeId(), 0) >= 0);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
*/

plugins {
id "com.gradle.develocity" version "3.18.2"
id "com.gradle.develocity" version "3.19"
}

ext.disableBuildCache = hasProperty('DISABLE_BUILD_CACHE') || System.getenv().containsKey('DISABLE_BUILD_CACHE')
Expand Down

0 comments on commit 68f7849

Please sign in to comment.