Skip to content

Commit

Permalink
Merge branch 'main' into kp/integration-test-stalekeys
Browse files Browse the repository at this point in the history
Signed-off-by: Kiran Prakash <[email protected]>
  • Loading branch information
kiranprakash154 authored Apr 2, 2024
2 parents 4087280 + 9ba8b4f commit 3f39a4b
Show file tree
Hide file tree
Showing 99 changed files with 3,186 additions and 486 deletions.
27 changes: 27 additions & 0 deletions .github/workflows/detect-breaking-change.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
name: "Detect Breaking Changes"
on:
pull_request

jobs:
detect-breaking-change:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-java@v4
with:
distribution: temurin # Temurin is a distribution of adoptium
java-version: 21
- uses: gradle/gradle-build-action@v3
with:
cache-disabled: true
arguments: japicmp
gradle-version: 8.7
build-root-directory: server
- if: failure()
run: cat server/build/reports/java-compatibility/report.txt
- if: failure()
uses: actions/upload-artifact@v4
with:
name: java-compatibility-report.html
path: ${{ github.workspace }}/server/build/reports/java-compatibility/report.html

5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,18 +103,22 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 2.x]
### Added
- Constant Keyword Field ([#12285](https://github.com/opensearch-project/OpenSearch/pull/12285))
- Convert ingest processor supports ip type ([#12818](https://github.com/opensearch-project/OpenSearch/pull/12818))
- Add a counter to node stat api to track shard going from idle to non-idle ([#12768](https://github.com/opensearch-project/OpenSearch/pull/12768))
- Allow setting KEYSTORE_PASSWORD through env variable ([#12865](https://github.com/opensearch-project/OpenSearch/pull/12865))
- [Concurrent Segment Search] Perform buildAggregation concurrently and support Composite Aggregations ([#12697](https://github.com/opensearch-project/OpenSearch/pull/12697))
- [Concurrent Segment Search] Disable concurrent segment search for system indices and throttled requests ([#12954](https://github.com/opensearch-project/OpenSearch/pull/12954))
- [Tiered caching] Make Indices Request Cache Stale Key Mgmt Threshold setting dynamic ([#12941](https://github.com/opensearch-project/OpenSearch/pull/12941))
- Detect breaking changes on pull requests ([#9044](https://github.com/opensearch-project/OpenSearch/pull/9044))
- Add cluster primary balance contraint for rebalancing with buffer ([#12656](https://github.com/opensearch-project/OpenSearch/pull/12656))

### Dependencies
- Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896))
- Bump `asm` from 9.6 to 9.7 ([#12908](https://github.com/opensearch-project/OpenSearch/pull/12908))
- Bump `net.minidev:json-smart` from 2.5.0 to 2.5.1 ([#12893](https://github.com/opensearch-project/OpenSearch/pull/12893))
- Bump `netty` from 4.1.107.Final to 4.1.108.Final ([#12924](https://github.com/opensearch-project/OpenSearch/pull/12924))
- Bump `commons-io:commons-io` from 2.15.1 to 2.16.0 ([#12996](https://github.com/opensearch-project/OpenSearch/pull/12996))

### Changed
- [BWC and API enforcement] Enforcing the presence of API annotations at build time ([#12872](https://github.com/opensearch-project/OpenSearch/pull/12872))
Expand All @@ -126,6 +130,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

### Fixed
- Fix issue with feature flags where default value may not be honored ([#12849](https://github.com/opensearch-project/OpenSearch/pull/12849))
- Fix UOE While building Exists query for nested search_as_you_type field ([#12048](https://github.com/opensearch-project/OpenSearch/pull/12048))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.lucene.search.DisjunctionMaxQuery;
import org.apache.lucene.search.MatchNoDocsQuery;
import org.apache.lucene.search.MultiPhraseQuery;
import org.apache.lucene.search.NormsFieldExistsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.SynonymQuery;
import org.apache.lucene.search.TermQuery;
Expand All @@ -68,6 +69,7 @@
import org.opensearch.index.query.MatchPhraseQueryBuilder;
import org.opensearch.index.query.MultiMatchQueryBuilder;
import org.opensearch.index.query.QueryShardContext;
import org.opensearch.index.query.QueryStringQueryBuilder;
import org.opensearch.plugins.Plugin;

import java.io.IOException;
Expand Down Expand Up @@ -541,6 +543,31 @@ public void testMatchPhrase() throws IOException {
}
}

public void testNestedExistsQuery() throws IOException {
MapperService mapperService = createMapperService(mapping(b -> {
b.startObject("field");
{
b.field("type", "object");
b.startObject("properties");
{
b.startObject("nested_field");
{
b.field("type", "search_as_you_type");
}
b.endObject();
}
b.endObject();
}
b.endObject();
}));
QueryShardContext queryShardContext = createQueryShardContext(mapperService);
Query actual = new QueryStringQueryBuilder("field:*").toQuery(queryShardContext);
Query expected = new ConstantScoreQuery(
new BooleanQuery.Builder().add(new NormsFieldExistsQuery("field.nested_field"), BooleanClause.Occur.SHOULD).build()
);
assertEquals(expected, actual);
}

private static BooleanQuery buildBoolPrefixQuery(String shingleFieldName, String prefixFieldName, List<String> terms) {
final BooleanQuery.Builder builder = new BooleanQuery.Builder();
for (int i = 0; i < terms.size() - 1; i++) {
Expand Down
2 changes: 1 addition & 1 deletion plugins/repository-hdfs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ dependencies {
api 'commons-collections:commons-collections:3.2.2'
api "org.apache.commons:commons-compress:${versions.commonscompress}"
api 'org.apache.commons:commons-configuration2:2.10.1'
api 'commons-io:commons-io:2.15.1'
api 'commons-io:commons-io:2.16.0'
api 'org.apache.commons:commons-lang3:3.14.0'
implementation 'com.google.re2j:re2j:1.7'
api 'javax.servlet:servlet-api:2.5'
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
27875a7935f1ddcc13267eb6fae1f719e0409572
79 changes: 79 additions & 0 deletions server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ plugins {
id('opensearch.publish')
id('opensearch.internal-cluster-test')
id('opensearch.optional-dependencies')
id('me.champeau.gradle.japicmp') version '0.4.2'
}

publishing {
Expand Down Expand Up @@ -378,3 +379,81 @@ tasks.named("sourcesJar").configure {
duplicatesStrategy = DuplicatesStrategy.EXCLUDE
}
}

/** Compares the current build against a snapshot build */
tasks.register("japicmp", me.champeau.gradle.japicmp.JapicmpTask) {
oldClasspath.from(files("${buildDir}/snapshot/opensearch-${version}.jar"))
newClasspath.from(tasks.named('jar'))
onlyModified = true
failOnModification = true
ignoreMissingClasses = true
annotationIncludes = ['@org.opensearch.common.annotation.PublicApi']
txtOutputFile = layout.buildDirectory.file("reports/java-compatibility/report.txt")
htmlOutputFile = layout.buildDirectory.file("reports/java-compatibility/report.html")
dependsOn downloadSnapshot
}

/** If the Java API Comparison task failed, print a hint if the change should be merged from its target branch */
gradle.taskGraph.afterTask { Task task, TaskState state ->
if (task.name == 'japicmp' && state.failure != null) {
def sha = getGitShaFromJar("${buildDir}/snapshot/opensearch-${version}.jar")
logger.info("Incompatiable java api from snapshot jar built off of commit ${sha}")

if (!inHistory(sha)) {
logger.warn('\u001B[33mPlease merge from the target branch and run this task again.\u001B[0m')
}
}
}

/** Downloads latest snapshot from maven repository */
tasks.register("downloadSnapshot", Copy) {
def mavenSnapshotRepoUrl = "https://aws.oss.sonatype.org/content/repositories/snapshots/"
def groupId = "org.opensearch"
def artifactId = "opensearch"

repositories {
maven {
url mavenSnapshotRepoUrl
}
}

configurations {
snapshotArtifact
}

dependencies {
snapshotArtifact("${groupId}:${artifactId}:${version}:")
}

from configurations.snapshotArtifact
into "$buildDir/snapshot"
}

/** Check if the sha is in the current history */
def inHistory(String sha) {
try {
def commandCheckSha = "git merge-base --is-ancestor ${sha} HEAD"
commandCheckSha.execute()
return true
} catch (Exception) {
return false
}
}

/** Extracts the Git SHA used to build a jar from its manifest */
def getGitShaFromJar(String jarPath) {
def sha = ''
try {
// Open the JAR file
def jarFile = new java.util.jar.JarFile(jarPath)
// Get the manifest from the JAR file
def manifest = jarFile.manifest
def attributes = manifest.mainAttributes
// Assuming the Git SHA is stored under an attribute named 'Git-SHA'
sha = attributes.getValue('Change')
jarFile.close()
} catch (IOException e) {
println "Failed to read the JAR file: $e.message"
}
return sha
}
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,8 @@ public static final IndexShard newIndexShard(
nodeId,
null,
DefaultRemoteStoreSettings.INSTANCE,
false
false,
IndexShardTestUtils.getFakeDiscoveryNodes(initializingShardRouting)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
import java.util.stream.Collectors;

import static org.opensearch.cluster.routing.ShardRoutingState.STARTED;
import static org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.PREFER_PRIMARY_SHARD_BALANCE;
import static org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.PREFER_PRIMARY_SHARD_REBALANCE;
import static org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.PRIMARY_SHARD_REBALANCE_BUFFER;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
Expand Down Expand Up @@ -58,6 +61,20 @@ public void enablePreferPrimaryBalance() {
);
}

public void setAllocationRelocationStrategy(boolean preferPrimaryBalance, boolean preferPrimaryRebalance, float buffer) {
assertAcked(
client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(
Settings.builder()
.put(PREFER_PRIMARY_SHARD_BALANCE.getKey(), preferPrimaryBalance)
.put(PREFER_PRIMARY_SHARD_REBALANCE.getKey(), preferPrimaryRebalance)
.put(PRIMARY_SHARD_REBALANCE_BUFFER.getKey(), buffer)
)
);
}

/**
* This test verifies that the overall primary balance is attained during allocation. This test verifies primary
* balance per index and across all indices is maintained.
Expand Down Expand Up @@ -87,7 +104,7 @@ public void testGlobalPrimaryAllocation() throws Exception {
state = client().admin().cluster().prepareState().execute().actionGet().getState();
logger.info(ShardAllocations.printShardDistribution(state));
verifyPerIndexPrimaryBalance();
verifyPrimaryBalance();
verifyPrimaryBalance(0.0f);
}

/**
Expand Down Expand Up @@ -224,6 +241,70 @@ public void testAllocationWithDisruption() throws Exception {
verifyPerIndexPrimaryBalance();
}

/**
* Similar to testSingleIndexShardAllocation test but creates multiple indices, multiple nodes adding in and getting
* removed. The test asserts post each such event that primary shard distribution is balanced for each index as well as across the nodes
* when the PREFER_PRIMARY_SHARD_REBALANCE is set to true
*/
public void testAllocationAndRebalanceWithDisruption() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final int maxReplicaCount = 2;
final int maxShardCount = 2;
// Create higher number of nodes than number of shards to reduce chances of SameShardAllocationDecider kicking-in
// and preventing primary relocations
final int nodeCount = randomIntBetween(5, 10);
final int numberOfIndices = randomIntBetween(1, 10);
final float buffer = randomIntBetween(1, 4) * 0.10f;

logger.info("--> Creating {} nodes", nodeCount);
final List<String> nodeNames = new ArrayList<>();
for (int i = 0; i < nodeCount; i++) {
nodeNames.add(internalCluster().startNode());
}
setAllocationRelocationStrategy(true, true, buffer);

int shardCount, replicaCount;
ClusterState state;
for (int i = 0; i < numberOfIndices; i++) {
shardCount = randomIntBetween(1, maxShardCount);
replicaCount = randomIntBetween(1, maxReplicaCount);
logger.info("--> Creating index test{} with primary {} and replica {}", i, shardCount, replicaCount);
createIndex("test" + i, shardCount, replicaCount, i % 2 == 0);
ensureGreen(TimeValue.timeValueSeconds(60));
if (logger.isTraceEnabled()) {
state = client().admin().cluster().prepareState().execute().actionGet().getState();
logger.info(ShardAllocations.printShardDistribution(state));
}
}
state = client().admin().cluster().prepareState().execute().actionGet().getState();
logger.info(ShardAllocations.printShardDistribution(state));
verifyPerIndexPrimaryBalance();
verifyPrimaryBalance(buffer);

final int additionalNodeCount = randomIntBetween(1, 5);
logger.info("--> Adding {} nodes", additionalNodeCount);

internalCluster().startNodes(additionalNodeCount);
ensureGreen(TimeValue.timeValueSeconds(60));
state = client().admin().cluster().prepareState().execute().actionGet().getState();
logger.info(ShardAllocations.printShardDistribution(state));
verifyPerIndexPrimaryBalance();
verifyPrimaryBalance(buffer);

int nodeCountToStop = additionalNodeCount;
while (nodeCountToStop > 0) {
internalCluster().stopRandomDataNode();
// give replica a chance to promote as primary before terminating node containing the replica
ensureGreen(TimeValue.timeValueSeconds(60));
nodeCountToStop--;
}
state = client().admin().cluster().prepareState().execute().actionGet().getState();
logger.info("--> Cluster state post nodes stop {}", state);
logger.info(ShardAllocations.printShardDistribution(state));
verifyPerIndexPrimaryBalance();
verifyPrimaryBalance(buffer);
}

/**
* Utility method which ensures cluster has balanced primary shard distribution across a single index.
* @throws Exception exception
Expand Down Expand Up @@ -263,7 +344,7 @@ private void verifyPerIndexPrimaryBalance() throws Exception {
}, 60, TimeUnit.SECONDS);
}

private void verifyPrimaryBalance() throws Exception {
private void verifyPrimaryBalance(float buffer) throws Exception {
assertBusy(() -> {
final ClusterState currentState = client().admin().cluster().prepareState().execute().actionGet().getState();
RoutingNodes nodes = currentState.getRoutingNodes();
Expand All @@ -278,7 +359,7 @@ private void verifyPrimaryBalance() throws Exception {
.filter(ShardRouting::primary)
.collect(Collectors.toList())
.size();
assertTrue(primaryCount <= avgPrimaryShardsPerNode);
assertTrue(primaryCount <= (avgPrimaryShardsPerNode * (1 + buffer)));
}
}, 60, TimeUnit.SECONDS);
}
Expand Down
Loading

0 comments on commit 3f39a4b

Please sign in to comment.