Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using async queue instead of submit new Runnable each time #25

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
45 changes: 28 additions & 17 deletions src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.util.Locale;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.text.NumberFormat;

/**
Expand All @@ -35,9 +32,10 @@
* on any StatsD clients.</p>
*
* @author Tom Denley
*
* @author Mauro Franceschini
*/
public final class NonBlockingStatsDClient extends ConvenienceMethodProvidingStatsDClient {
private static final int STATS_QUEUE_MAX_SIZE = 64 * 1024;

private static final Charset STATS_D_ENCODING = Charset.forName("UTF-8");

Expand All @@ -48,6 +46,9 @@ public final class NonBlockingStatsDClient extends ConvenienceMethodProvidingSta
private final String prefix;
private final DatagramSocket clientSocket;
private final StatsDClientErrorHandler handler;
private final BlockingQueue<String> statsQueue;
private Future<?> executorFuture;
private boolean stopping = false;

private final ExecutorService executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
final ThreadFactory delegate = Executors.defaultThreadFactory();
Expand Down Expand Up @@ -107,13 +108,27 @@ public NonBlockingStatsDClient(String prefix, String hostname, int port) throws
public NonBlockingStatsDClient(String prefix, String hostname, int port, StatsDClientErrorHandler errorHandler) throws StatsDClientException {
this.prefix = (prefix == null || prefix.trim().isEmpty()) ? "" : (prefix.trim() + ".");
this.handler = errorHandler;
this.statsQueue = new ArrayBlockingQueue<String>(STATS_QUEUE_MAX_SIZE);

try {
this.clientSocket = new DatagramSocket();
this.clientSocket.connect(new InetSocketAddress(hostname, port));
} catch (Exception e) {
throw new StatsDClientException("Failed to start StatsD client", e);
}

this.executorFuture = executor.submit(new Runnable() {
@Override
public void run() {
while (!stopping) {
try {
blockingSend(statsQueue.take());
} catch (InterruptedException ex) {
handler.handle(ex);
}
}
}
});
}

/**
Expand All @@ -123,6 +138,8 @@ public NonBlockingStatsDClient(String prefix, String hostname, int port, StatsDC
@Override
public void stop() {
try {
stopping = true;
executorFuture.cancel(true);
executor.shutdown();
executor.awaitTermination(30, TimeUnit.SECONDS);
}
Expand Down Expand Up @@ -151,7 +168,7 @@ public void stop() {
*/
@Override
public void count(String aspect, long delta, double sampleRate) {
send(messageFor(aspect, Long.toString(delta), "c", sampleRate));
send(messageFor(aspect, delta, "c", sampleRate));
}

/**
Expand All @@ -169,7 +186,6 @@ public void recordGaugeValue(String aspect, long value) {
recordGaugeCommon(aspect, Long.toString(value), value < 0, false);
}

@Override
public void recordGaugeValue(String aspect, double value) {
recordGaugeCommon(aspect, stringValueOf(value), value < 0, false);
}
Expand All @@ -179,7 +195,6 @@ public void recordGaugeDelta(String aspect, long value) {
recordGaugeCommon(aspect, Long.toString(value), value < 0, true);
}

@Override
public void recordGaugeDelta(String aspect, double value) {
recordGaugeCommon(aspect, stringValueOf(value), value < 0, true);
}
Expand Down Expand Up @@ -221,25 +236,21 @@ public void recordSetEvent(String aspect, String eventName) {
*/
@Override
public void recordExecutionTime(String aspect, long timeInMs, double sampleRate) {
send(messageFor(aspect, Long.toString(timeInMs), "ms", sampleRate));
send(messageFor(aspect, timeInMs, "ms", sampleRate));
}

private String messageFor(String aspect, String value, String type) {
private String messageFor(String aspect, Object value, String type) {
return messageFor(aspect, value, type, 1.0);
}

private String messageFor(String aspect, String value, String type, double sampleRate) {
private String messageFor(String aspect, Object value, String type, double sampleRate) {
final String messageFormat = (sampleRate == 1.0) ? "%s%s:%s|%s" : "%s%s:%s|%s@%f";
return String.format((Locale)null, messageFormat, prefix, aspect, value, type, sampleRate);
return String.format(Locale.US, messageFormat, prefix, aspect, value, type, sampleRate);
}

private void send(final String message) {
try {
executor.execute(new Runnable() {
@Override public void run() {
blockingSend(message);
}
});
statsQueue.add(message);
}
catch (Exception e) {
handler.handle(e);
Expand Down