Skip to content

Commit

Permalink
Change AsyncRawSocketSender implementing inferface to return Future<V>
Browse files Browse the repository at this point in the history
  • Loading branch information
cosmo0920 committed May 10, 2016
1 parent 3d0acc4 commit 1bf11d7
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 26 deletions.
15 changes: 4 additions & 11 deletions src/main/java/org/fluentd/logger/sender/AsyncRawSocketSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
* @author mxk
*
*/
public class AsyncRawSocketSender implements Sender {
public class AsyncRawSocketSender implements AsyncSender {

private final class EmitRunnable implements Callable<Boolean> {
private final String tag;
Expand Down Expand Up @@ -107,21 +107,14 @@ public void close() {
}

@Override
public boolean emit(String tag, Map<String, Object> data) {
public Future<Boolean> emit(String tag, Map<String, Object> data) {
return emit(tag, System.currentTimeMillis() / 1000, data);
}

@Override
public boolean emit(final String tag, final long timestamp, final Map<String, Object> data) {
public Future<Boolean> emit(final String tag, final long timestamp, final Map<String, Object> data) {
final RawSocketSender sender = this.sender;
try {
Future<Boolean> result = senderTask.submit(new EmitRunnable(tag, data, sender, timestamp));
return result.get();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
return senderTask.submit(new EmitRunnable(tag, data, sender, timestamp));
}

@Override
Expand Down
24 changes: 24 additions & 0 deletions src/main/java/org/fluentd/logger/sender/AsyncSender.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package org.fluentd.logger.sender;

import org.fluentd.logger.errorhandler.ErrorHandler;

import java.util.Map;
import java.util.concurrent.Future;

public interface AsyncSender {
Future<Boolean> emit(String tag, Map<String, Object> data);

Future<Boolean> emit(String tag, long timestamp, Map<String, Object> data);

void flush();

void close();

String getName();

boolean isConnected();

void setErrorHandler(ErrorHandler errorHandler);

void removeErrorHandler();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -52,7 +54,7 @@ public void process(MessagePack msgpack, Socket socket) throws IOException {
fluentd.waitUntilReady();

// start asyncSenders
Sender asyncSender = new AsyncRawSocketSender("localhost", port);
AsyncSender asyncSender = new AsyncRawSocketSender("localhost", port);
Map<String, Object> data = new HashMap<String, Object>();
data.put("t1k1", "t1v1");
data.put("t1k2", "t1v2");
Expand Down Expand Up @@ -115,7 +117,7 @@ public void process(MessagePack msgpack, Socket socket) throws IOException {
fluentd.waitUntilReady();

// start asyncSenders
Sender asyncSender = new AsyncRawSocketSender("localhost", port);
AsyncSender asyncSender = new AsyncRawSocketSender("localhost", port);
int count = 10000;
for (int i = 0; i < count; i++) {
String tag = "tag:i";
Expand Down Expand Up @@ -186,7 +188,7 @@ public void process(MessagePack msgpack, Socket socket) throws IOException {
fluentds[1].waitUntilReady();

// start AsyncSenders
Sender[] asyncSenders = new Sender[2];
AsyncSender[] asyncSenders = new AsyncSender[2];
int[] counts = new int[2];
asyncSenders[0] = asyncRawSocketSender;
counts[0] = 10000;
Expand Down Expand Up @@ -254,7 +256,7 @@ public void run() {
}

@Test
public void testBufferingAndResending() throws InterruptedException, IOException {
public void testBufferingAndResending() throws InterruptedException, IOException, ExecutionException {
final ConcurrentLinkedQueue<Event> readEvents = new ConcurrentLinkedQueue<Event>();
final CountDownLatch countDownLatch = new CountDownLatch(4);
int port = MockFluentd.randomPort();
Expand All @@ -278,12 +280,12 @@ public void process(MessagePack msgpack, Socket socket) throws IOException {
fluentd.start();
fluentd.waitUntilReady();

Sender asyncSender = new AsyncRawSocketSender("localhost", port);
AsyncSender asyncSender = new AsyncRawSocketSender("localhost", port);
assertFalse(asyncSender.isConnected());
Map<String, Object> data = new HashMap<String, Object>();
data.put("key0", "v0");
boolean emitted1 = asyncSender.emit("tag0", data);
assertTrue(emitted1);
Future<Boolean> emitted1 = asyncSender.emit("tag0", data);
assertTrue(emitted1.get());

// close fluentd to make the next sending failed
TimeUnit.MILLISECONDS.sleep(500);
Expand All @@ -294,21 +296,21 @@ public void process(MessagePack msgpack, Socket socket) throws IOException {

data = new HashMap<String, Object>();
data.put("key0", "v1");
boolean emitted2 = asyncSender.emit("tag0", data);
assertTrue(emitted2);
Future<Boolean> emitted2 = asyncSender.emit("tag0", data);
assertTrue(emitted2.get());

// wait to avoid the suppression of reconnection
TimeUnit.MILLISECONDS.sleep(500);

data = new HashMap<String, Object>();
data.put("key0", "v2");
boolean emitted3 = asyncSender.emit("tag0", data);
assertTrue(emitted3);
Future<Boolean> emitted3 = asyncSender.emit("tag0", data);
assertTrue(emitted3.get());

data = new HashMap<String, Object>();
data.put("key0", "v3");
boolean emitted4 = asyncSender.emit("tag0", data);
assertTrue(emitted4);
Future<Boolean> emitted4 = asyncSender.emit("tag0", data);
assertTrue(emitted4.get());

countDownLatch.await(500, TimeUnit.MILLISECONDS);

Expand Down Expand Up @@ -382,7 +384,7 @@ public void run() {
});

// start asyncSenders
Sender asyncSender = new AsyncRawSocketSender("localhost", port);
AsyncSender asyncSender = new AsyncRawSocketSender("localhost", port);
String tag = "tag";
int i;
for (i = 0; i < 1000000; i++) { // Enough to fill the sender's buffer
Expand All @@ -392,7 +394,7 @@ public void run() {

if (bufferFull.getCount() > 0) {
// Fill the sender's buffer
if (!asyncSender.emit(tag, record)) {
if (!asyncSender.emit(tag, record).get()) {
// Buffer full. Need to recover the fluentd
bufferFull.countDown();
Thread.sleep(2000);
Expand Down

0 comments on commit 1bf11d7

Please sign in to comment.