Skip to content

Commit

Permalink
threadpool
Browse files Browse the repository at this point in the history
Signed-off-by: Bharathwaj G <[email protected]>
  • Loading branch information
bharath-techie committed Sep 6, 2023
1 parent b6dfbf3 commit 0169e6f
Show file tree
Hide file tree
Showing 13 changed files with 30 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ public SearchTask createTask(long id, String type, String action, TaskId parentT
}
latch.await();
} finally {
ThreadPool.terminate(threadPool, 100, TimeUnit.MILLISECONDS);
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ public void onFailure(Exception e) {
disableBlocks(plugins);
assertionConsumer.apply(numDeleteAcknowledged, numDeleteUnAcknowledged, numResourceNotFound);
} finally {
ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS);
ThreadPool.terminate(testThreadPool, 10, TimeUnit.SECONDS);
}
}

Expand Down Expand Up @@ -241,7 +241,7 @@ public void onFailure(Exception e) {
countDownLatch.await();
assertionConsumer.apply(numDeleteAcknowledged, numDeleteUnAcknowledged, numResourceNotFound);
} finally {
ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS);
ThreadPool.terminate(testThreadPool, 10, TimeUnit.SECONDS);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ public void onFailure(Exception e) {
disableBlocks(plugins);
assertionConsumer.apply(numGetSuccess, numGetFailures, numVersionConflictFailures);
} finally {
ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS);
ThreadPool.terminate(testThreadPool, 10, TimeUnit.SECONDS);
}
}

Expand Down Expand Up @@ -333,7 +333,7 @@ public void onFailure(Exception e) {
}

} finally {
ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS);
ThreadPool.terminate(testThreadPool, 10, TimeUnit.SECONDS);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public void onFailure(Exception e) {
assertionConsumer.apply(numSuccess, numGetFailures, numVersionConflictFailures, numResourceNotFoundFailures);
}
} finally {
ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS);
ThreadPool.terminate(testThreadPool, 10, TimeUnit.SECONDS);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public void onFailure(Exception e) {
disableBlocks(plugins);
assertionConsumer.apply(numStartedAsynchronousSearch, numFailedAsynchronousSearch, numRejectedAsynchronousSearch);
} finally {
ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS);
ThreadPool.terminate(testThreadPool, 10, TimeUnit.SECONDS);
}
}

Expand Down Expand Up @@ -232,7 +232,7 @@ public void onFailure(Exception e) {
countDownLatch.await();
assertionConsumer.apply(numStartedAsynchronousSearch, numFailedAsynchronousSearch, numErrorResponseAsynchronousSearch);
} finally {
ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS);
ThreadPool.terminate(testThreadPool, 10, TimeUnit.SECONDS);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public SearchTask createTask(long id, String type, String action, TaskId parentT
assertThat(exceptionRef.get(), instanceOf(SearchPhaseExecutionException.class));
}
} finally {
ThreadPool.terminate(threadPool, 100, TimeUnit.MILLISECONDS);
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public SearchTask createTask(long id, String type, String action, TaskId parentT
List<? extends Terms.Bucket> buckets = terms.getBuckets();
assertThat(buckets.size(), equalTo(aggregationSize));
} finally {
ThreadPool.terminate(threadPool, 100, TimeUnit.MILLISECONDS);
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public SearchTask createTask(long id, String type, String action, TaskId parentT
latch.await();
AsynchronousSearchAssertions.assertSearchResponses(responseRef.get(), listener.partialResponse());
} finally {
ThreadPool.terminate(threadPool, 100, TimeUnit.MILLISECONDS);
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void testListenerOnResponseForSuccessfulSearch() throws InterruptedExcept
assertEquals(mockAsynchronousSearchResp, responseList.get(i).get());
}
} finally {
ThreadPool.terminate(threadPool, 100, TimeUnit.MILLISECONDS);
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
}
}

Expand Down Expand Up @@ -130,7 +130,7 @@ public void testListenerOnResponseForFailedSearch() throws InterruptedException
assertEquals(mockAsynchronousSearchFailResp, responseList.get(i).get());
}
} finally {
ThreadPool.terminate(threadPool, 100, TimeUnit.MILLISECONDS);
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
}
}

Expand Down Expand Up @@ -167,7 +167,7 @@ public void testListenerOnFailureForFailedSearch() throws InterruptedException {
assertEquals(mockPostProcessingException, exceptionList.get(i).get());
}
} finally {
ThreadPool.terminate(threadPool, 100, TimeUnit.MILLISECONDS);
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
}
}

Expand Down Expand Up @@ -204,7 +204,7 @@ public void testListenerOnFailureForSuccessfulSearch() throws InterruptedExcepti
assertEquals(mockPostProcessingException, exceptionList.get(i).get());
}
} finally {
ThreadPool.terminate(threadPool, 100, TimeUnit.MILLISECONDS);
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void testSubmitInvalidWaitForCompletion() throws Exception {
}

public void testMaxRunningAsynchronousSearchContexts() throws Exception {
int numThreads = 50;
int numThreads = 4000;
List<Thread> threadsList = new LinkedList<>();
CyclicBarrier barrier = new CyclicBarrier(numThreads + 1);
for (int i = 0; i < numThreads; i++) {
Expand Down Expand Up @@ -116,7 +116,7 @@ public void testMaxRunningAsynchronousSearchContexts() throws Exception {
for (Thread thread : threadsList) {
thread.join();
}
assertEquals(numFailures.get(), 50);
assertEquals(numFailures.get(), 4000);
updateClusterSettings(AsynchronousSearchActiveStore.NODE_CONCURRENT_RUNNING_SEARCHES_SETTING.getKey(),
AsynchronousSearchActiveStore.NODE_CONCURRENT_RUNNING_SEARCHES);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public void testProcessSearchFailureOnDeletedContext() throws AsynchronousSearch
assertFalse(activeContextCleanUpConsumerInvocation.get());
assertEquals(0, fakeClient.persistenceCount);
} finally {
ThreadPool.terminate(testThreadPool, 200, TimeUnit.MILLISECONDS);
ThreadPool.terminate(testThreadPool, 10, TimeUnit.SECONDS);
}
}

Expand Down Expand Up @@ -180,7 +180,7 @@ public void testProcessSearchResponseBeginPersistence() throws AsynchronousSearc
waitUntil(() -> fakeClient.persistenceCount == 1);
assertEquals(1, fakeClient.persistenceCount);
} finally {
ThreadPool.terminate(testThreadPool, 200, TimeUnit.MILLISECONDS);
ThreadPool.terminate(testThreadPool, 10, TimeUnit.SECONDS);
}
}

Expand Down Expand Up @@ -226,7 +226,7 @@ public void testProcessSearchResponsePersisted() throws AsynchronousSearchStateM
} catch (InterruptedException e) {
fail("Test interrupted " + e.getMessage());
} finally {
ThreadPool.terminate(testThreadPool, 200, TimeUnit.MILLISECONDS);
ThreadPool.terminate(testThreadPool, 10, TimeUnit.SECONDS);
}
}

Expand Down Expand Up @@ -271,7 +271,7 @@ public void testProcessSearchResponseForExpiredContext() throws AsynchronousSear
} catch (InterruptedException e) {
fail("Interrupted exception" + e.getMessage());
} finally {
ThreadPool.terminate(testThreadPool, 200, TimeUnit.MILLISECONDS);
ThreadPool.terminate(testThreadPool, 10, TimeUnit.SECONDS);
}
}

Expand Down Expand Up @@ -315,7 +315,7 @@ public void testProcessSearchResponseOnClosedContext() throws AsynchronousSearch
assertFalse(activeContextCleanUpConsumerInvocation.get());
assertEquals(0, fakeClient.persistenceCount);
} finally {
ThreadPool.terminate(testThreadPool, 200, TimeUnit.MILLISECONDS);
ThreadPool.terminate(testThreadPool, 10, TimeUnit.SECONDS);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ public void testFindContext() throws InterruptedException {
freeContextLatch.await();
}
} finally {
ThreadPool.terminate(testThreadPool, 200, TimeUnit.MILLISECONDS);
ThreadPool.terminate(testThreadPool, 10, TimeUnit.SECONDS);
}
}

Expand Down Expand Up @@ -241,7 +241,7 @@ public void testUpdateExpirationTimesOutBlockedOnPersistence() throws Interrupte
updateLatch.await();
fakeClient.releaseBlock();
} finally {
ThreadPool.terminate(testThreadPool, 200, TimeUnit.MILLISECONDS);
ThreadPool.terminate(testThreadPool, 10, TimeUnit.SECONDS);
}
}

Expand Down Expand Up @@ -303,7 +303,7 @@ public void testUpdateExpirationOnRunningSearch() throws InterruptedException {
}, e -> fail()), updateLatch));
updateLatch.await();
} finally {
ThreadPool.terminate(testThreadPool, 200, TimeUnit.MILLISECONDS);
ThreadPool.terminate(testThreadPool, 10, TimeUnit.SECONDS);
}
}

Expand Down Expand Up @@ -337,7 +337,7 @@ public void testFindContextOnNonExistentSearch() throws InterruptedException {
randomNonNegativeLong()), user1, failureExpectingListener);
findContextLatch.await();
} finally {
ThreadPool.terminate(testThreadPool, 200, TimeUnit.MILLISECONDS);
ThreadPool.terminate(testThreadPool, 10, TimeUnit.SECONDS);
}
}

Expand Down Expand Up @@ -379,7 +379,7 @@ public void testBootStrapOnClosedSearch() {
expectThrows(IllegalStateException.class, () -> asService.bootstrapSearch(task,
context.getContextId()));
} finally {
ThreadPool.terminate(testThreadPool, 200, TimeUnit.MILLISECONDS);
ThreadPool.terminate(testThreadPool, 10, TimeUnit.SECONDS);
}
}

Expand Down Expand Up @@ -433,7 +433,7 @@ public boolean isCancelled() {
new LatchedActionListener<>(wrap(r -> fail(), e -> assertTrue(e instanceof ResourceNotFoundException)), latch));
latch.await();
} finally {
ThreadPool.terminate(testThreadPool, 200, TimeUnit.MILLISECONDS);
ThreadPool.terminate(testThreadPool, 10, TimeUnit.SECONDS);
}
}

Expand Down Expand Up @@ -486,7 +486,7 @@ public boolean isCancelled() {
new LatchedActionListener<>(wrap(Assert::assertTrue, e -> fail()), latch));
latch.await();
} finally {
ThreadPool.terminate(testThreadPool, 200, TimeUnit.MILLISECONDS);
ThreadPool.terminate(testThreadPool, 10, TimeUnit.SECONDS);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public void testStateMachine() throws InterruptedException, BrokenBarrierExcepti
assertEquals("success:" + success, 0, customContextListener.getRunningCount());
assertEquals(1, customContextListener.getDeletedCount());
} finally {
ThreadPool.terminate(threadPool, 100, TimeUnit.MILLISECONDS);
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
}
}

Expand Down

0 comments on commit 0169e6f

Please sign in to comment.