diff --git a/server/src/main/java/org/opensearch/index/shard/CloseableRetryableRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/CloseableRetryableRefreshListener.java index 6b9c619e3141a..23874603d3f52 100644 --- a/server/src/main/java/org/opensearch/index/shard/CloseableRetryableRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/CloseableRetryableRefreshListener.java @@ -35,6 +35,8 @@ public abstract class CloseableRetryableRefreshListener implements ReferenceMana */ private static final int TOTAL_PERMITS = 1; + private static final TimeValue DRAIN_TIMEOUT = TimeValue.timeValueMinutes(10); + private final AtomicBoolean closed = new AtomicBoolean(false); private final Semaphore semaphore = new Semaphore(TOTAL_PERMITS); @@ -187,7 +189,8 @@ private void scheduleRetry(boolean afterRefreshSuccessful, boolean didRefresh) { public final Releasable drainRefreshes() { try { - if (semaphore.tryAcquire(TOTAL_PERMITS, 10, TimeUnit.MINUTES)) { + TimeValue timeout = getDrainTimeout(); + if (semaphore.tryAcquire(TOTAL_PERMITS, timeout.seconds(), TimeUnit.SECONDS)) { boolean result = closed.compareAndSet(false, true); assert result && semaphore.availablePermits() == 0; getLogger().info("All permits are acquired and refresh listener is closed"); @@ -208,6 +211,14 @@ public final Releasable drainRefreshes() { protected abstract Logger getLogger(); + // Made available for unit testing purpose only + /** + * Returns the timeout which is used while draining refreshes. + */ + protected TimeValue getDrainTimeout() { + return DRAIN_TIMEOUT; + } + // Visible for testing /** * Returns if the retry is scheduled or not. @@ -217,4 +228,14 @@ public final Releasable drainRefreshes() { boolean getRetryScheduledStatus() { return retryScheduled.get(); } + + // Visible for testing + int availablePermits() { + return semaphore.availablePermits(); + } + + // Visible for testing + boolean isClosed() { + return closed.get(); + } } diff --git a/server/src/test/java/org/opensearch/index/shard/CloseableRetryableRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/CloseableRetryableRefreshListenerTests.java index cc192e4eefa0a..13694043c450a 100644 --- a/server/src/test/java/org/opensearch/index/shard/CloseableRetryableRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/CloseableRetryableRefreshListenerTests.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.common.lease.Releasable; import org.opensearch.common.unit.TimeValue; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; @@ -274,6 +275,7 @@ protected Logger getLogger() { testRefreshListener.afterRefresh(randomBoolean()); testRefreshListener.drainRefreshes(); assertNotEquals(0, countDownLatch.getCount()); + assertRefreshListenerClosed(testRefreshListener); } public void testCloseWaitsForAcquiringAllPermits() throws Exception { @@ -308,6 +310,7 @@ protected Logger getLogger() { thread.start(); assertBusy(() -> assertEquals(0, countDownLatch.getCount())); testRefreshListener.drainRefreshes(); + assertRefreshListenerClosed(testRefreshListener); } public void testScheduleRetryAfterClose() throws Exception { @@ -368,6 +371,7 @@ protected TimeValue getNextRetryInterval() { thread1.join(); thread2.join(); assertBusy(() -> assertEquals(1, runCount.get())); + assertRefreshListenerClosed(testRefreshListener); } public void testConcurrentScheduleRetry() throws Exception { @@ -409,6 +413,7 @@ protected boolean isRetryEnabled() { testRefreshListener.afterRefresh(true); assertBusy(() -> assertEquals(3, runCount.get())); testRefreshListener.drainRefreshes(); + assertRefreshListenerClosed(testRefreshListener); } public void testExceptionDuringThreadPoolSchedule() throws Exception { @@ -451,6 +456,120 @@ protected boolean isRetryEnabled() { assertBusy(() -> assertFalse(testRefreshListener.getRetryScheduledStatus())); assertEquals(1, runCount.get()); testRefreshListener.drainRefreshes(); + assertRefreshListenerClosed(testRefreshListener); + } + + public void testTimeoutDuringClose() throws Exception { + // This test checks the expected behaviour when the drainRefreshes times out. + CloseableRetryableRefreshListener testRefreshListener = new CloseableRetryableRefreshListener(mock(ThreadPool.class)) { + @Override + protected boolean performAfterRefreshWithPermit(boolean didRefresh) { + try { + Thread.sleep(TimeValue.timeValueSeconds(2).millis()); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + return true; + } + + @Override + public void beforeRefresh() {} + + @Override + protected Logger getLogger() { + return logger; + } + + @Override + protected TimeValue getDrainTimeout() { + return TimeValue.timeValueSeconds(1); + } + }; + Thread thread1 = new Thread(() -> { + try { + testRefreshListener.afterRefresh(true); + } catch (IOException e) { + throw new AssertionError(e); + } + }); + thread1.start(); + assertBusy(() -> assertEquals(0, testRefreshListener.availablePermits())); + RuntimeException ex = assertThrows(RuntimeException.class, testRefreshListener::drainRefreshes); + assertEquals("Failed to acquire all permits", ex.getMessage()); + thread1.join(); + } + + public void testThreadInterruptDuringClose() throws Exception { + // This test checks the expected behaviour when the thread performing the drainRefresh is interrupted. + CountDownLatch latch = new CountDownLatch(2); + CloseableRetryableRefreshListener testRefreshListener = new CloseableRetryableRefreshListener(mock(ThreadPool.class)) { + @Override + protected boolean performAfterRefreshWithPermit(boolean didRefresh) { + try { + Thread.sleep(TimeValue.timeValueSeconds(2).millis()); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + return true; + } + + @Override + public void beforeRefresh() {} + + @Override + protected Logger getLogger() { + return logger; + } + + @Override + protected TimeValue getDrainTimeout() { + return TimeValue.timeValueSeconds(2); + } + }; + Thread thread1 = new Thread(() -> { + try { + testRefreshListener.afterRefresh(true); + latch.countDown(); + } catch (IOException e) { + throw new AssertionError(e); + } + }); + Thread thread2 = new Thread(() -> { + RuntimeException ex = assertThrows(RuntimeException.class, testRefreshListener::drainRefreshes); + assertEquals("Failed to acquire all permits", ex.getMessage()); + latch.countDown(); + }); + thread1.start(); + assertBusy(() -> assertEquals(0, testRefreshListener.availablePermits())); + thread2.start(); + thread2.interrupt(); + thread1.join(); + thread2.join(); + assertEquals(0, latch.getCount()); + } + + public void testResumeRefreshesAfterDrainRefreshes() { + // This test checks the expected behaviour when the refresh listener is drained, but then refreshes are resumed again + // by closing the releasables acquired by calling the drainRefreshes method. + CloseableRetryableRefreshListener testRefreshListener = new CloseableRetryableRefreshListener(mock(ThreadPool.class)) { + @Override + protected boolean performAfterRefreshWithPermit(boolean didRefresh) { + return true; + } + + @Override + public void beforeRefresh() {} + + @Override + protected Logger getLogger() { + return logger; + } + }; + assertRefreshListenerOpen(testRefreshListener); + Releasable releasable = testRefreshListener.drainRefreshes(); + assertRefreshListenerClosed(testRefreshListener); + releasable.close(); + assertRefreshListenerOpen(testRefreshListener); } @After @@ -458,4 +577,14 @@ public void tearDown() throws Exception { super.tearDown(); terminate(threadPool); } + + private void assertRefreshListenerClosed(CloseableRetryableRefreshListener testRefreshListener) { + assertTrue(testRefreshListener.isClosed()); + assertEquals(0, testRefreshListener.availablePermits()); + } + + private void assertRefreshListenerOpen(CloseableRetryableRefreshListener testRefreshListener) { + assertFalse(testRefreshListener.isClosed()); + assertEquals(1, testRefreshListener.availablePermits()); + } }