From f917a7dfcc25ef3e89b9b89dd2f9b22cb10afca7 Mon Sep 17 00:00:00 2001 From: Tigran Mkrtchyan Date: Wed, 18 Sep 2019 20:28:01 +0200 Subject: [PATCH] rpc: remove complete timeout task after request is complete Motivation: ReplyQueue uses ScheduledThreadPoolExecutor to collect expired request. However, non expired requests (those that get reply) not removed from the scheduler's queue. Modification: Configure ScheduledThreadPoolExecutor to remove canceled tasks. Add test to demonstrate this 'leak' pattern as well as improve overall test coverage. Result: Avoid potential memory leak for high number of requests with bit timeout values. Acked-by: Marina Sahakyan Target: master, 3.1 (cherry picked from commit 9328681052e2708348445cf4fcfa4b9aae0713c4) Signed-off-by: Tigran Mkrtchyan --- .../org/dcache/oncrpc4j/rpc/ReplyQueue.java | 14 ++++- .../dcache/oncrpc4j/rpc/ReplyQueueTest.java | 61 +++++++++++++++++++ 2 files changed, 73 insertions(+), 2 deletions(-) create mode 100644 oncrpc4j-core/src/test/java/org/dcache/oncrpc4j/rpc/ReplyQueueTest.java diff --git a/oncrpc4j-core/src/main/java/org/dcache/oncrpc4j/rpc/ReplyQueue.java b/oncrpc4j-core/src/main/java/org/dcache/oncrpc4j/rpc/ReplyQueue.java index f3798a3b..d5cd862f 100644 --- a/oncrpc4j-core/src/main/java/org/dcache/oncrpc4j/rpc/ReplyQueue.java +++ b/oncrpc4j-core/src/main/java/org/dcache/oncrpc4j/rpc/ReplyQueue.java @@ -19,14 +19,15 @@ */ package org.dcache.oncrpc4j.rpc; +import com.google.common.annotations.VisibleForTesting; import java.io.EOFException; import java.net.SocketAddress; import java.nio.channels.CompletionHandler; import java.util.Collection; import java.util.Collections; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; @@ -36,7 +37,7 @@ public class ReplyQueue { - private final ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() { + private final ScheduledThreadPoolExecutor executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() { private final AtomicInteger counter = new AtomicInteger(); @Override @@ -48,6 +49,10 @@ public Thread newThread(Runnable r) { }); private final ConcurrentMap _queue = new ConcurrentHashMap<>(); + public ReplyQueue() { + executorService.setRemoveOnCancelPolicy(true); + } + /** * Register callback handler for a given xid. The Callback is called when * client receives reply from the server, request failed of expired. @@ -146,6 +151,11 @@ void failed(Throwable t) { } } + @VisibleForTesting + BlockingQueue getTimeoutQueue() { + return executorService.getQueue(); + } + /** * Shutdown all background activity, if any. */ diff --git a/oncrpc4j-core/src/test/java/org/dcache/oncrpc4j/rpc/ReplyQueueTest.java b/oncrpc4j-core/src/test/java/org/dcache/oncrpc4j/rpc/ReplyQueueTest.java new file mode 100644 index 00000000..6c63e4d9 --- /dev/null +++ b/oncrpc4j-core/src/test/java/org/dcache/oncrpc4j/rpc/ReplyQueueTest.java @@ -0,0 +1,61 @@ +package org.dcache.oncrpc4j.rpc; + +import java.io.EOFException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.channels.CompletionHandler; +import java.util.concurrent.TimeUnit; +import org.junit.Test; +import org.junit.Before; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +public class ReplyQueueTest { + + private ReplyQueue replyQueue; + private SocketAddress addr; + private CompletionHandler handler; + + @Before + public void setUp() { + replyQueue = new ReplyQueue(); + addr = mock(InetSocketAddress.class); + handler = mock(CompletionHandler.class); + } + + @Test + public void testRemoveCancel() throws EOFException { + + replyQueue.registerKey(1, addr, handler, 1, TimeUnit.MINUTES); + + assertFalse(replyQueue.getTimeoutQueue().isEmpty()); + + replyQueue.get(1); + + assertTrue(replyQueue.getTimeoutQueue().isEmpty()); + } + + @Test + public void testInvokeHandlerOnTimeout() throws EOFException, InterruptedException { + + replyQueue.registerKey(1, addr, handler, 1, TimeUnit.NANOSECONDS); + + TimeUnit.SECONDS.sleep(1); + assertTrue(replyQueue.getPendingRequests().isEmpty()); + assertTrue(replyQueue.getTimeoutQueue().isEmpty()); + verify(handler).failed(any(), any()); + } + + @Test + public void testRequestWithoutOnTimeout() throws EOFException, InterruptedException { + + replyQueue.registerKey(1, addr, handler); + assertFalse(replyQueue.getPendingRequests().isEmpty()); + assertTrue(replyQueue.getTimeoutQueue().isEmpty()); + } + +}