diff --git a/client/rest-high-level/src/test/java/org/opensearch/client/PitIT.java b/client/rest-high-level/src/test/java/org/opensearch/client/PitIT.java index 5ec1da77a6795..c384f679f1852 100644 --- a/client/rest-high-level/src/test/java/org/opensearch/client/PitIT.java +++ b/client/rest-high-level/src/test/java/org/opensearch/client/PitIT.java @@ -11,23 +11,26 @@ import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpPut; import org.opensearch.OpenSearchStatusException; -import org.opensearch.action.search.CreatePitRequest; -import org.opensearch.action.search.CreatePitResponse; -import org.opensearch.action.search.DeletePitInfo; -import org.opensearch.action.search.DeletePitRequest; -import org.opensearch.action.search.DeletePitResponse; -import org.opensearch.action.search.GetAllPitNodesResponse; +import org.opensearch.action.search.*; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.action.ActionListener; import org.junit.Before; +import org.opensearch.monitor.jvm.JvmInfo; import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import static org.hamcrest.Matchers.containsString; + /** * Tests point in time API with rest high level client */ @@ -72,6 +75,84 @@ public void testCreateAndDeletePit() throws IOException { assertTrue(deletePitResponse.getDeletePitResults().get(0).getPitId().equals(createPitResponse.getId())); } + public void testMaxRunningSearches() throws Exception { + MemoryMXBean MEMORY_MX_BEAN = ManagementFactory.getMemoryMXBean(); + logger.info("USED size here : {}", MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed()/1024/1024); + logger.info("heap size of env[{}]", JvmInfo.jvmInfo().getMem().getHeapMax()); + try { + int numThreads = 50; + List threadsList = new LinkedList<>(); + logger.info(threadsList.size()); + CyclicBarrier barrier = new CyclicBarrier(numThreads + 1); + for (int i = 0; i < numThreads; i++) { + threadsList.add(new Thread(() -> { + try { + SearchRequest validRequest = new SearchRequest(); + validRequest.indices("index"); + logger.info("USED size before : {}", MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed() / 1024 / 1024); + //client + + SearchResponse searchResponse = execute(validRequest, highLevelClient()::search, highLevelClient()::searchAsync); + assertNotNull(searchResponse); + logger.info("USED size after : {}", MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed() / 1024 / 1024); + } catch (IOException e) { + fail("submit request failed"); + } finally { + try { + + barrier.await(); + } catch (Exception e) { + fail(); + } + } + } + )); + } + threadsList.forEach(Thread::start); + barrier.await(); + for (Thread thread : threadsList) { + logger.info("USED size thread : {}", MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed() / 1024 / 1024); + thread.join(); + } + + + //updateClusterSettings(AsynchronousSearchActiveStore.NODE_CONCURRENT_RUNNING_SEARCHES_SETTING.getKey(), 0); + threadsList.clear(); + AtomicInteger numFailures = new AtomicInteger(); + for (int i = 0; i < numThreads; i++) { + threadsList.add(new Thread(() -> { + try { + SearchRequest validRequest = new SearchRequest(); + //validRequest.waitForCompletionTimeout(TimeValue.timeValueMillis(1)); + SearchResponse searchResponse = execute(validRequest, highLevelClient()::search, highLevelClient()::searchAsync); + } catch (Exception e) { + assertTrue(e instanceof ResponseException); + assertThat(e.getMessage(), containsString("Trying to create too many concurrent searches")); + numFailures.getAndIncrement(); + + } finally { + try { + numFailures.getAndIncrement(); + barrier.await(); + } catch (Exception e) { + fail(); + } + } + } + )); + } + threadsList.forEach(Thread::start); + barrier.await(); + for (Thread thread : threadsList) { + thread.join(); + } + assertEquals(numFailures.get(), 50); + } catch (Exception e) { + logger.info("========== EXCEPTION : " + e.getMessage()); + logger.info("============== USED SIZE : " + MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed() / 1024 / 1024); + } + } + public void testDeleteAllAndListAllPits() throws IOException, InterruptedException { CreatePitRequest pitRequest = new CreatePitRequest(new TimeValue(1, TimeUnit.DAYS), true, "index"); CreatePitResponse pitResponse = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync); diff --git a/gradle.properties b/gradle.properties index 7c359ed2b652c..a4e7aacf8870f 100644 --- a/gradle.properties +++ b/gradle.properties @@ -13,13 +13,13 @@ org.gradle.caching=true org.gradle.warning.mode=none org.gradle.parallel=true -org.gradle.jvmargs=-Xmx3g -XX:+HeapDumpOnOutOfMemoryError -Xss2m \ +org.gradle.jvmargs=-Xmx512M -XX:+HeapDumpOnOutOfMemoryError -Xss2m \ --add-exports jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED \ --add-exports jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED \ --add-exports jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED \ --add-exports jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED \ --add-exports jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED -options.forkOptions.memoryMaximumSize=3g +options.forkOptions.memoryMaximumSize=512M # Disable Gradle Enterprise Gradle plugin's test retry systemProp.gradle.enterprise.testretry.enabled=false