Skip to content

Commit

Permalink
Make StatsD client queue size and telemetry configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
mathispesch authored and truthbk committed Mar 14, 2022
1 parent d617245 commit 109fed1
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 28 deletions.
43 changes: 37 additions & 6 deletions src/main/java/org/datadog/jmxfetch/AppConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@

import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;

import lombok.Builder;
import lombok.ToString;
import org.datadog.jmxfetch.converter.ExitWatcherConverter;
import org.datadog.jmxfetch.converter.ReporterConverter;
import org.datadog.jmxfetch.reporter.ConsoleReporter;
import org.datadog.jmxfetch.reporter.JsonReporter;
import org.datadog.jmxfetch.reporter.Reporter;
import org.datadog.jmxfetch.reporter.ReporterFactory;
import org.datadog.jmxfetch.service.ServiceNameProvider;
import org.datadog.jmxfetch.validator.LogLevelValidator;
import org.datadog.jmxfetch.validator.PositiveIntegerValidator;
Expand Down Expand Up @@ -57,6 +56,9 @@ public class AppConfig {
private static final int DEFAULT_THREAD_POOL_SIZE = 3;
private static final int DEFAULT_COLLECTION_TO_S = 60;
private static final int DEFAULT_RECONNECTION_TO_S = 60;
private static final int DEFAULT_STATSD_QUEUE_SIZE = 4096;

private Reporter reporter;

@Parameter(
names = {"--help", "-h"},
Expand Down Expand Up @@ -111,9 +113,22 @@ public class AppConfig {
+ "\"statsd:unix://[STATSD_UNIX_SOCKET_PATH]\", "
+ "\"console\" or \"json\"",
validateWith = ReporterValidator.class,
converter = ReporterConverter.class,
required = true)
private String reporterString;

@Parameter(
names = {"--statsd_telemetry", "-st"},
description = "Enable StatsD client telemetry reporting",
required = false)
private Reporter reporter;
private boolean statsdTelemetry;

@Parameter(
names = {"--statsd_queue_size", "-sq"},
description = "Maximum number of unprocessed messages in the StatsD client queue.",
validateWith = PositiveIntegerValidator.class,
required = false)
@Builder.Default
private int statsdQueueSize = DEFAULT_STATSD_QUEUE_SIZE;

@Parameter(
names = {"--check", "-c"},
Expand Down Expand Up @@ -290,11 +305,11 @@ public String getAction() {
}

public boolean isConsoleReporter() {
return reporter != null && (reporter instanceof ConsoleReporter);
return getReporter() != null && (getReporter() instanceof ConsoleReporter);
}

public boolean isJsonReporter() {
return reporter != null && (reporter instanceof JsonReporter);
return getReporter() != null && (getReporter() instanceof JsonReporter);
}

public boolean isHelp() {
Expand Down Expand Up @@ -345,7 +360,11 @@ public boolean getAutoDiscoveryPipeEnabled() {
return adEnabled;
}

/** Returns the Reporter for this app config. */
public Reporter getReporter() {
if (reporter == null && reporterString != null) {
reporter = ReporterFactory.getReporter(this);
}
return reporter;
}

Expand All @@ -361,6 +380,18 @@ public String getTmpDirectory() {
return tmpDirectory;
}

public String getReporterString() {
return reporterString;
}

public boolean getStatsdTelemetry() {
return statsdTelemetry;
}

public int getStatsdQueueSize() {
return statsdQueueSize;
}

public String getLogLevel() {
return logLevel;
}
Expand Down

This file was deleted.

23 changes: 16 additions & 7 deletions src/main/java/org/datadog/jmxfetch/reporter/ReporterFactory.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package org.datadog.jmxfetch.reporter;

import org.datadog.jmxfetch.util.StringUtils;
import org.datadog.jmxfetch.AppConfig;

import java.util.Arrays;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class ReporterFactory {

/** Gets the reporter for the corresponding type string (console, statsd). */
public static Reporter getReporter(String type) {
/** Gets the reporter for the corresponding app config. */
public static Reporter getReporter(AppConfig appConfig) {
String type = appConfig.getReporterString();
if (type == null || type.length() <= 0) {
throw new IllegalArgumentException("Null or empty reporter type");
}
Expand All @@ -23,17 +23,26 @@ public static Reporter getReporter(String type) {
if (matcher.find() && matcher.groupCount() == 2) {
String host = matcher.group(1);
Integer port = Integer.valueOf(matcher.group(2));
return new StatsdReporter(host, port);
return new StatsdReporter(
host,
port,
appConfig.getStatsdTelemetry(),
appConfig.getStatsdQueueSize());
}

matcher = Pattern.compile("^statsd:unix://(.*)$").matcher(type);
if (matcher.find() && matcher.groupCount() == 1) {
String socketPath = matcher.group(1);
return new StatsdReporter(socketPath, 0);
return new StatsdReporter(
socketPath,
0,
appConfig.getStatsdTelemetry(),
appConfig.getStatsdQueueSize());
}
}
throw new IllegalArgumentException("Invalid reporter type: " + type);
}

private ReporterFactory() {}
private ReporterFactory() {
}
}
20 changes: 18 additions & 2 deletions src/main/java/org/datadog/jmxfetch/reporter/StatsdReporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@ public class StatsdReporter extends Reporter {
private StatsDClient statsDClient;
private String statsdHost;
private int statsdPort;
private Boolean telemetry;
private int queueSize;
private long initializationTime;

/** Constructor, instantiates statsd reported to provided host and port. */
public StatsdReporter(String statsdHost, int statsdPort) {
public StatsdReporter(String statsdHost, int statsdPort, boolean telemetry, int queueSize) {
this.statsdHost = statsdHost;
this.statsdPort = statsdPort;
this.telemetry = telemetry;
this.queueSize = queueSize;
this.init();
}

Expand All @@ -34,10 +38,14 @@ private void init() {
/* Create the StatsDClient with "entity-id" set to "none" to avoid
having dogstatsd server adding origin tags, when the connection is
done with UDS. */
log.info("Initializing Statsd reporter with parameters host={} port={} telemetry={} "
+ "queueSize={} entityId={}",
this.statsdHost, this.statsdPort, this.telemetry, this.queueSize, entityId);
NonBlockingStatsDClientBuilder builder = new NonBlockingStatsDClientBuilder()
.enableTelemetry(false)
.hostname(this.statsdHost)
.port(this.statsdPort)
.enableTelemetry(this.telemetry)
.queueSize(this.queueSize)
.errorHandler(handler)
.entityID(entityId);

Expand Down Expand Up @@ -105,4 +113,12 @@ public String getStatsdHost() {
public int getStatsdPort() {
return statsdPort;
}

public boolean getTelemetry() {
return telemetry;
}

public int getQueueSize() {
return queueSize;
}
}
27 changes: 26 additions & 1 deletion src/test/java/org/datadog/jmxfetch/TestParsingJCommander.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.ParameterException;
Expand All @@ -23,7 +24,7 @@ public class TestParsingJCommander {
private static final String SINGLE_CHECK = "jmx.yaml";
private static final List<String> MULTI_CHECK = Arrays.asList("jmx.yaml", "jmx-2.yaml");
private static final String STATUS_LOCATION = "/status/status_location";
private static final String EXIT_FILE_LOCATION = "/status/exit_locationt";
private static final String EXIT_FILE_LOCATION = "/status/exit_location";
private static final String IPC_HOSTNAME = "localhost";
private static final String IPC_PORT = "5001";

Expand Down Expand Up @@ -161,6 +162,8 @@ public void testParsingReporter() {
assertTrue(appConfig.getReporter() instanceof StatsdReporter);
assertEquals("localhost", ((StatsdReporter) appConfig.getReporter()).getStatsdHost());
assertEquals(10, ((StatsdReporter) appConfig.getReporter()).getStatsdPort());
assertFalse(((StatsdReporter) appConfig.getReporter()).getTelemetry());
assertEquals(4096, ((StatsdReporter) appConfig.getReporter()).getQueueSize());

// statsd reporter with custom ipv4 host
params =
Expand Down Expand Up @@ -232,6 +235,28 @@ public void testParsingReporter() {
assertTrue(appConfig.getReporter() instanceof StatsdReporter);
assertEquals("/path/to/dsd.socket", ((StatsdReporter) appConfig.getReporter()).getStatsdHost());
assertEquals(0, ((StatsdReporter) appConfig.getReporter()).getStatsdPort());

// Telemetry and queue size
params =
new String[] {
"-r",
"statsd:unix:///path/to/dsd.socket",
"-st",
"-sq",
"8192",
"--check",
SINGLE_CHECK,
"--conf_directory",
CONF_DIR,
AppConfig.ACTION_COLLECT
};
appConfig = testCommand(params);
assertNotNull(appConfig.getReporter());
assertTrue(appConfig.getReporter() instanceof StatsdReporter);
assertEquals("/path/to/dsd.socket", ((StatsdReporter) appConfig.getReporter()).getStatsdHost());
assertEquals(0, ((StatsdReporter) appConfig.getReporter()).getStatsdPort());
assertTrue(((StatsdReporter) appConfig.getReporter()).getTelemetry());
assertEquals(8192, ((StatsdReporter) appConfig.getReporter()).getQueueSize());
}

@Test
Expand Down

0 comments on commit 109fed1

Please sign in to comment.