diff --git a/src/main/java/org/fluentd/logger/sender/AsyncRawSocketSender.java b/src/main/java/org/fluentd/logger/sender/AsyncRawSocketSender.java index 8fc2d0b..daeba4b 100644 --- a/src/main/java/org/fluentd/logger/sender/AsyncRawSocketSender.java +++ b/src/main/java/org/fluentd/logger/sender/AsyncRawSocketSender.java @@ -24,7 +24,7 @@ * @author mxk * */ -public class AsyncRawSocketSender implements Sender { +public class AsyncRawSocketSender implements AsyncSender { private final class EmitRunnable implements Callable { private final String tag; @@ -107,21 +107,14 @@ public void close() { } @Override - public boolean emit(String tag, Map data) { + public Future emit(String tag, Map data) { return emit(tag, System.currentTimeMillis() / 1000, data); } @Override - public boolean emit(final String tag, final long timestamp, final Map data) { + public Future emit(final String tag, final long timestamp, final Map data) { final RawSocketSender sender = this.sender; - try { - Future result = senderTask.submit(new EmitRunnable(tag, data, sender, timestamp)); - return result.get(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } catch (ExecutionException e) { - throw new RuntimeException(e); - } + return senderTask.submit(new EmitRunnable(tag, data, sender, timestamp)); } @Override diff --git a/src/main/java/org/fluentd/logger/sender/AsyncSender.java b/src/main/java/org/fluentd/logger/sender/AsyncSender.java new file mode 100644 index 0000000..1e9d0a8 --- /dev/null +++ b/src/main/java/org/fluentd/logger/sender/AsyncSender.java @@ -0,0 +1,24 @@ +package org.fluentd.logger.sender; + +import org.fluentd.logger.errorhandler.ErrorHandler; + +import java.util.Map; +import java.util.concurrent.Future; + +public interface AsyncSender { + Future emit(String tag, Map data); + + Future emit(String tag, long timestamp, Map data); + + void flush(); + + void close(); + + String getName(); + + boolean isConnected(); + + void setErrorHandler(ErrorHandler errorHandler); + + void removeErrorHandler(); +} diff --git a/src/test/java/org/fluentd/logger/sender/TestAsyncRawSocketSender.java b/src/test/java/org/fluentd/logger/sender/TestAsyncRawSocketSender.java index 040a03f..7f581dd 100644 --- a/src/test/java/org/fluentd/logger/sender/TestAsyncRawSocketSender.java +++ b/src/test/java/org/fluentd/logger/sender/TestAsyncRawSocketSender.java @@ -17,8 +17,10 @@ import java.util.Map; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -52,7 +54,7 @@ public void process(MessagePack msgpack, Socket socket) throws IOException { fluentd.waitUntilReady(); // start asyncSenders - Sender asyncSender = new AsyncRawSocketSender("localhost", port); + AsyncSender asyncSender = new AsyncRawSocketSender("localhost", port); Map data = new HashMap(); data.put("t1k1", "t1v1"); data.put("t1k2", "t1v2"); @@ -115,7 +117,7 @@ public void process(MessagePack msgpack, Socket socket) throws IOException { fluentd.waitUntilReady(); // start asyncSenders - Sender asyncSender = new AsyncRawSocketSender("localhost", port); + AsyncSender asyncSender = new AsyncRawSocketSender("localhost", port); int count = 10000; for (int i = 0; i < count; i++) { String tag = "tag:i"; @@ -186,7 +188,7 @@ public void process(MessagePack msgpack, Socket socket) throws IOException { fluentds[1].waitUntilReady(); // start AsyncSenders - Sender[] asyncSenders = new Sender[2]; + AsyncSender[] asyncSenders = new AsyncSender[2]; int[] counts = new int[2]; asyncSenders[0] = asyncRawSocketSender; counts[0] = 10000; @@ -254,7 +256,7 @@ public void run() { } @Test - public void testBufferingAndResending() throws InterruptedException, IOException { + public void testBufferingAndResending() throws InterruptedException, IOException, ExecutionException { final ConcurrentLinkedQueue readEvents = new ConcurrentLinkedQueue(); final CountDownLatch countDownLatch = new CountDownLatch(4); int port = MockFluentd.randomPort(); @@ -278,12 +280,12 @@ public void process(MessagePack msgpack, Socket socket) throws IOException { fluentd.start(); fluentd.waitUntilReady(); - Sender asyncSender = new AsyncRawSocketSender("localhost", port); + AsyncSender asyncSender = new AsyncRawSocketSender("localhost", port); assertFalse(asyncSender.isConnected()); Map data = new HashMap(); data.put("key0", "v0"); - boolean emitted1 = asyncSender.emit("tag0", data); - assertTrue(emitted1); + Future emitted1 = asyncSender.emit("tag0", data); + assertTrue(emitted1.get()); // close fluentd to make the next sending failed TimeUnit.MILLISECONDS.sleep(500); @@ -294,21 +296,21 @@ public void process(MessagePack msgpack, Socket socket) throws IOException { data = new HashMap(); data.put("key0", "v1"); - boolean emitted2 = asyncSender.emit("tag0", data); - assertTrue(emitted2); + Future emitted2 = asyncSender.emit("tag0", data); + assertTrue(emitted2.get()); // wait to avoid the suppression of reconnection TimeUnit.MILLISECONDS.sleep(500); data = new HashMap(); data.put("key0", "v2"); - boolean emitted3 = asyncSender.emit("tag0", data); - assertTrue(emitted3); + Future emitted3 = asyncSender.emit("tag0", data); + assertTrue(emitted3.get()); data = new HashMap(); data.put("key0", "v3"); - boolean emitted4 = asyncSender.emit("tag0", data); - assertTrue(emitted4); + Future emitted4 = asyncSender.emit("tag0", data); + assertTrue(emitted4.get()); countDownLatch.await(500, TimeUnit.MILLISECONDS); @@ -382,7 +384,7 @@ public void run() { }); // start asyncSenders - Sender asyncSender = new AsyncRawSocketSender("localhost", port); + AsyncSender asyncSender = new AsyncRawSocketSender("localhost", port); String tag = "tag"; int i; for (i = 0; i < 1000000; i++) { // Enough to fill the sender's buffer @@ -392,7 +394,7 @@ public void run() { if (bufferFull.getCount() > 0) { // Fill the sender's buffer - if (!asyncSender.emit(tag, record)) { + if (!asyncSender.emit(tag, record).get()) { // Buffer full. Need to recover the fluentd bufferFull.countDown(); Thread.sleep(2000);