From 73029f49565f30dd036252da647a0108f746576b Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Wed, 20 Apr 2016 17:23:04 +0900 Subject: [PATCH] Add test case for buffering and resending --- .../sender/TestAsyncRawSocketSender.java | 89 +++++++++++++++++++ 1 file changed, 89 insertions(+) diff --git a/src/test/java/org/fluentd/logger/sender/TestAsyncRawSocketSender.java b/src/test/java/org/fluentd/logger/sender/TestAsyncRawSocketSender.java index 067e83c..8af70ed 100644 --- a/src/test/java/org/fluentd/logger/sender/TestAsyncRawSocketSender.java +++ b/src/test/java/org/fluentd/logger/sender/TestAsyncRawSocketSender.java @@ -248,4 +248,93 @@ public void run() { assertTrue(socketFinished.get()); executor.shutdownNow(); } + + @Test + public void testBufferingAndResending() throws InterruptedException, IOException { + final ConcurrentLinkedQueue readEvents = new ConcurrentLinkedQueue(); + final CountDownLatch countDownLatch = new CountDownLatch(4); + int port = MockFluentd.randomPort(); + MockProcess mockProcess = new MockFluentd.MockProcess() { + public void process(MessagePack msgpack, Socket socket) throws IOException { + BufferedInputStream in = new BufferedInputStream(socket.getInputStream()); + try { + Unpacker unpacker = msgpack.createUnpacker(in); + while (true) { + Event e = unpacker.read(Event.class); + readEvents.add(e); + countDownLatch.countDown(); + } + } catch (EOFException e) { + // e.printStackTrace(); + } + } + }; + + MockFluentd fluentd = new MockFluentd(port, mockProcess); + fluentd.start(); + + Sender asyncSender = new AsyncRawSocketSender("localhost", port); + assertFalse(asyncSender.isConnected()); + Map data = new HashMap(); + data.put("key0", "v0"); + boolean emitted1 = asyncSender.emit("tag0", data); + assertTrue(emitted1); + + // close fluentd to make the next sending failed + TimeUnit.MILLISECONDS.sleep(500); + + fluentd.closeClientSockets(); + + TimeUnit.MILLISECONDS.sleep(500); + + data = new HashMap(); + data.put("key0", "v1"); + boolean emitted2 = asyncSender.emit("tag0", data); + assertTrue(emitted2); + + // wait to avoid the suppression of reconnection + TimeUnit.MILLISECONDS.sleep(500); + + data = new HashMap(); + data.put("key0", "v2"); + boolean emitted3 = asyncSender.emit("tag0", data); + assertTrue(emitted3); + + data = new HashMap(); + data.put("key0", "v3"); + boolean emitted4 = asyncSender.emit("tag0", data); + assertTrue(emitted4); + + countDownLatch.await(500, TimeUnit.MILLISECONDS); + + asyncSender.close(); + + fluentd.close(); + + assertEquals(4, readEvents.size()); + + Event event = readEvents.poll(); + assertEquals("tag0", event.tag); + assertEquals(1, event.data.size()); + assertTrue(event.data.keySet().contains("key0")); + assertTrue(event.data.values().contains("v0")); + + event = readEvents.poll(); + assertEquals("tag0", event.tag); + assertEquals(1, event.data.size()); + assertTrue(event.data.keySet().contains("key0")); + assertTrue(event.data.values().contains("v1")); + + event = readEvents.poll(); + assertEquals("tag0", event.tag); + assertEquals(1, event.data.size()); + assertTrue(event.data.keySet().contains("key0")); + assertTrue(event.data.values().contains("v2")); + + event = readEvents.poll(); + assertEquals("tag0", event.tag); + assertEquals(1, event.data.size()); + assertTrue(event.data.keySet().contains("key0")); + assertTrue(event.data.values().contains("v3")); + } }