diff --git a/docker/controller-node/60-add-script-signature-approvals.groovy b/docker/controller-node/60-add-script-signature-approvals.groovy
new file mode 100644
index 00000000..4b92f0bd
--- /dev/null
+++ b/docker/controller-node/60-add-script-signature-approvals.groovy
@@ -0,0 +1,22 @@
+import org.jenkinsci.plugins.scriptsecurity.scripts.ScriptApproval
+
+// needed by the stress-test-traces-submit pipeline
+def scriptApproval = ScriptApproval.get()
+def signatures = [
+ "field org.datadog.jenkins.plugins.datadog.traces.write.TraceWriter asyncWriter",
+ "field org.datadog.jenkins.plugins.datadog.util.AsyncWriter queue",
+ "method java.util.concurrent.BlockingQueue remainingCapacity",
+ "new java.util.concurrent.atomic.AtomicBoolean boolean",
+ "method java.util.concurrent.atomic.AtomicBoolean get",
+ "method java.util.concurrent.atomic.AtomicBoolean set boolean",
+ "staticMethod org.datadog.jenkins.plugins.datadog.traces.write.TraceWriterFactory getTraceWriter",
+]
+
+signatures.each { signature ->
+ if (!scriptApproval.getPendingSignatures().any { it.signature == signature }) {
+ scriptApproval.approveSignature(signature)
+ println "Approved signature: $signature"
+ } else {
+ println "Signature already pending or approved: $signature"
+ }
+}
diff --git a/docker/controller-node/Dockerfile b/docker/controller-node/Dockerfile
index 2d505c51..1af9b262 100644
--- a/docker/controller-node/Dockerfile
+++ b/docker/controller-node/Dockerfile
@@ -34,6 +34,7 @@ COPY 30-create-github-credentials.groovy /usr/share/jenkins/ref/init.groovy.d/30
COPY 40-create-datadog-key_credentials.groovy /usr/share/jenkins/ref/init.groovy.d/40-create-datadog-key_credentials.groovy
COPY 50-create-jobs.groovy /usr/share/jenkins/ref/init.groovy.d/50-create-jobs.groovy
COPY 51-create-multi-branch-pipeline.groovy /usr/share/jenkins/ref/init.groovy.d/51-create-multi-branch-pipeline.groovy
+COPY 60-add-script-signature-approvals.groovy /usr/share/jenkins/ref/init.groovy.d/60-add-script-signature-approvals.groovy
RUN /var/jenkins_home/add-github-to-known-hosts.sh
diff --git a/docker/controller-node/jobs/stress-test-traces-submit.cps b/docker/controller-node/jobs/stress-test-traces-submit.cps
new file mode 100644
index 00000000..5d30c01c
--- /dev/null
+++ b/docker/controller-node/jobs/stress-test-traces-submit.cps
@@ -0,0 +1,51 @@
+import java.util.concurrent.atomic.AtomicBoolean
+
+node {
+ def numBuilds = params.NUM_BUILDS ?: 10
+ def numSteps = params.NUM_STEPS ?: 100
+ def jobComplete = new AtomicBoolean(false)
+
+ def parallelStages = [:]
+
+ parallelStages['Generate jobs'] = {
+ stage('Generate jobs') {
+ def builds = [:]
+
+ // Loop to create parallel jobs
+ for (int i = 1; i <= numBuilds; i++) {
+ def jobIndex = i
+ builds["Job-${jobIndex}"] = {
+ echo "Starting Job ${jobIndex}"
+
+ // Inner loop to create steps within each job
+ for (int j = 1; j <= numSteps; j++) {
+ echo "Executing step ${j} in Job ${jobIndex}"
+
+ // Execute a shell command to echo random characters
+ sh "echo ${UUID.randomUUID()}"
+ }
+
+ echo "Finished Load Job ${jobIndex}"
+ }
+ }
+
+ // Execute all jobs in parallel
+ parallel builds
+ jobComplete.set(true)
+ }
+ }
+
+ parallelStages['Print traces queue capacity'] = {
+ stage('Print traces queue capacity') {
+ script {
+ waitUntil {
+ echo "Remaining traces queue capacity ${org.datadog.jenkins.plugins.datadog.traces.write.TraceWriterFactory.getTraceWriter().asyncWriter.queue.remainingCapacity()}"
+ sleep time: 1, unit: 'SECONDS'
+ return jobComplete.get()
+ }
+ }
+ }
+ }
+
+ parallel parallelStages
+}
diff --git a/pom.xml b/pom.xml
index 3fac53e5..72126d64 100644
--- a/pom.xml
+++ b/pom.xml
@@ -215,6 +215,16 @@
access-modifier-suppressions
${access-modifier-checker.version}
+
+ io.dropwizard.metrics
+ metrics-core
+ 4.2.28
+
+
+ io.dropwizard.metrics
+ metrics-json
+ 4.2.28
+
diff --git a/src/main/java/org/datadog/jenkins/plugins/datadog/DatadogUtilities.java b/src/main/java/org/datadog/jenkins/plugins/datadog/DatadogUtilities.java
index 36d7fb06..aceb4c81 100644
--- a/src/main/java/org/datadog/jenkins/plugins/datadog/DatadogUtilities.java
+++ b/src/main/java/org/datadog/jenkins/plugins/datadog/DatadogUtilities.java
@@ -1237,24 +1237,24 @@ public static String getDatadogPluginVersion() {
}
public static int envVar(String name, int defaultValue) {
- String value = System.getenv(name);
- if (value != null) {
- try {
- return Integer.parseInt(value);
- } catch (Exception e) {
- DatadogUtilities.severe(logger, null, "Invalid value " + value + " provided for env var " + name + ": integer number expected");
- }
- }
- return defaultValue;
+ return envVar(name, Integer::parseInt, defaultValue);
}
public static double envVar(String name, double defaultValue) {
+ return envVar(name, Double::parseDouble, defaultValue);
+ }
+
+ public static boolean envVar(String name, boolean defaultValue) {
+ return envVar(name, Boolean::parseBoolean, defaultValue);
+ }
+
+ public static T envVar(String name, Function parser, T defaultValue) {
String value = System.getenv(name);
if (value != null) {
try {
- return Double.parseDouble(value);
+ return parser.apply(value);
} catch (Exception e) {
- DatadogUtilities.severe(logger, null, "Invalid value " + value + " provided for env var " + name + ": floating point number expected");
+ DatadogUtilities.severe(logger, null, "Invalid value " + value + " provided for env var " + name);
}
}
return defaultValue;
diff --git a/src/main/java/org/datadog/jenkins/plugins/datadog/clients/CompressedBatchSender.java b/src/main/java/org/datadog/jenkins/plugins/datadog/clients/CompressedBatchSender.java
new file mode 100644
index 00000000..23d99a0f
--- /dev/null
+++ b/src/main/java/org/datadog/jenkins/plugins/datadog/clients/CompressedBatchSender.java
@@ -0,0 +1,71 @@
+package org.datadog.jenkins.plugins.datadog.clients;
+
+import net.sf.json.JSONObject;
+import java.io.ByteArrayOutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.logging.Logger;
+import java.util.zip.GZIPOutputStream;
+
+public class CompressedBatchSender implements JsonPayloadSender {
+
+ private static final Logger logger = Logger.getLogger(CompressedBatchSender.class.getName());
+
+ private static final byte[] BEGIN_JSON_ARRAY = "[".getBytes(StandardCharsets.UTF_8);
+ private static final byte[] END_JSON_ARRAY = "]".getBytes(StandardCharsets.UTF_8);
+ private static final byte[] COMMA = ",".getBytes(StandardCharsets.UTF_8);
+
+ private final HttpClient httpClient;
+ private final String url;
+ private final Map headers;
+ private final int batchLimitBytes;
+ private final Function payloadToJson;
+
+ public CompressedBatchSender(HttpClient httpClient,
+ String url,
+ Map headers,
+ int batchLimitBytes,
+ Function payloadToJson) {
+ this.httpClient = httpClient;
+ this.url = url;
+ this.headers = headers;
+ this.batchLimitBytes = batchLimitBytes;
+ this.payloadToJson = payloadToJson;
+ }
+
+ @Override
+ public void send(Collection payloads) throws Exception {
+ ByteArrayOutputStream request = new ByteArrayOutputStream();
+ GZIPOutputStream gzip = new GZIPOutputStream(request);
+ // the backend checks the size limit against the uncompressed body of the request
+ int uncompressedRequestLength = 0;
+
+ for (T payload : payloads) {
+ JSONObject json = payloadToJson.apply(payload);
+ byte[] body = json.toString().getBytes(StandardCharsets.UTF_8);
+ if (body.length + 2 > batchLimitBytes) { // + 2 is for array beginning and end: []
+ logger.severe("Dropping a payload because size (" + body.length + ") exceeds the allowed limit of " + batchLimitBytes);
+ continue;
+ }
+
+ if (uncompressedRequestLength + body.length + 2 > batchLimitBytes) { // + 2 is for comma and array end: ,]
+ gzip.write(END_JSON_ARRAY);
+ gzip.close();
+ httpClient.post(url, headers, "application/json", request.toByteArray(), Function.identity());
+ request = new ByteArrayOutputStream();
+ gzip = new GZIPOutputStream(request);
+ uncompressedRequestLength = 0;
+ }
+
+ gzip.write(uncompressedRequestLength == 0 ? BEGIN_JSON_ARRAY : COMMA);
+ gzip.write(body);
+ uncompressedRequestLength += body.length + 1;
+ }
+
+ gzip.write(END_JSON_ARRAY);
+ gzip.close();
+ httpClient.post(url, headers, "application/json", request.toByteArray(), Function.identity());
+ }
+}
diff --git a/src/main/java/org/datadog/jenkins/plugins/datadog/clients/DatadogAgentClient.java b/src/main/java/org/datadog/jenkins/plugins/datadog/clients/DatadogAgentClient.java
index 24b525a8..22be0603 100644
--- a/src/main/java/org/datadog/jenkins/plugins/datadog/clients/DatadogAgentClient.java
+++ b/src/main/java/org/datadog/jenkins/plugins/datadog/clients/DatadogAgentClient.java
@@ -25,6 +25,8 @@ of this software and associated documentation files (the "Software"), to deal
package org.datadog.jenkins.plugins.datadog.clients;
+import static org.datadog.jenkins.plugins.datadog.traces.write.TraceWriteStrategy.ENABLE_TRACES_BATCHING_ENV_VAR;
+
import com.timgroup.statsd.Event;
import com.timgroup.statsd.NonBlockingStatsDClient;
import com.timgroup.statsd.ServiceCheck;
@@ -301,7 +303,7 @@ private static final class AgentLogWriteStrategy implements LogWriteStrategy {
private final String host;
private final int port;
- private final CircuitBreaker> circuitBreaker;
+ private final CircuitBreaker> circuitBreaker;
private Socket socket;
private OutputStream out;
@@ -318,17 +320,17 @@ private AgentLogWriteStrategy(String host, int port) {
CircuitBreaker.DEFAULT_DELAY_FACTOR);
}
- public void send(List payloads) {
+ public void send(List payloads) {
circuitBreaker.accept(payloads);
}
- private void doSend(List payloads) throws Exception {
+ private void doSend(List payloads) throws Exception {
if (socket == null || socket.isClosed() || !socket.isConnected()) {
socket = new Socket(host, port);
out = new BufferedOutputStream(socket.getOutputStream());
}
- for (String payload : payloads) {
- out.write(payload.getBytes(StandardCharsets.UTF_8));
+ for (net.sf.json.JSONObject payload : payloads) {
+ out.write(payload.toString().getBytes(StandardCharsets.UTF_8));
out.write(LINE_SEPARATOR);
}
}
@@ -338,7 +340,7 @@ private void handleError(Exception e) {
DatadogUtilities.severe(logger, e, "Could not write logs to agent");
}
- private void fallback(List payloads) {
+ private void fallback(List payloads) {
// cannot establish connection to agent, do nothing
}
@@ -368,7 +370,26 @@ private void flushSafely() throws IOException {
@Override
public TraceWriteStrategy createTraceWriteStrategy() {
- TraceWriteStrategyImpl evpStrategy = new TraceWriteStrategyImpl(Track.WEBHOOK, this::sendSpansToWebhook);
+ DatadogGlobalConfiguration datadogGlobalDescriptor = DatadogUtilities.getDatadogGlobalDescriptor();
+ String urlParameters = datadogGlobalDescriptor != null ? "?service=" + datadogGlobalDescriptor.getCiInstanceName() : "";
+ String url = String.format("http://%s:%d/evp_proxy/v1/api/v2/webhook/%s", hostname, traceCollectionPort, urlParameters);
+
+ // TODO use CompressedBatchSender unconditionally in the next release
+ JsonPayloadSender payloadSender;
+ if (DatadogUtilities.envVar(ENABLE_TRACES_BATCHING_ENV_VAR, false)) {
+ Map headers = Map.of(
+ "X-Datadog-EVP-Subdomain", "webhook-intake",
+ "DD-CI-PROVIDER-NAME", "jenkins",
+ "Content-Encoding", "gzip");
+ payloadSender = new CompressedBatchSender<>(client, url, headers, PAYLOAD_SIZE_LIMIT, p -> p.getJson());
+ } else {
+ Map headers = Map.of(
+ "X-Datadog-EVP-Subdomain", "webhook-intake",
+ "DD-CI-PROVIDER-NAME", "jenkins");
+ payloadSender = new SimpleSender<>(client, url, headers, p -> p.getJson());
+ }
+
+ TraceWriteStrategyImpl evpStrategy = new TraceWriteStrategyImpl(Track.WEBHOOK, payloadSender::send);
TraceWriteStrategyImpl apmStrategy = new TraceWriteStrategyImpl(Track.APM, this::sendSpansToApm);
return new AgentTraceWriteStrategy(evpStrategy, apmStrategy, this::isEvpProxySupported);
}
@@ -410,36 +431,6 @@ Set fetchAgentSupportedEndpoints() {
}
}
- /**
- * Posts a given payload to the Agent EVP Proxy, so it is forwarded to the Webhook Intake.
- */
- private void sendSpansToWebhook(Collection spans) {
- DatadogGlobalConfiguration datadogGlobalDescriptor = DatadogUtilities.getDatadogGlobalDescriptor();
- String urlParameters = datadogGlobalDescriptor != null ? "?service=" + datadogGlobalDescriptor.getCiInstanceName() : "";
- String url = String.format("http://%s:%d/evp_proxy/v1/api/v2/webhook/%s", hostname, traceCollectionPort, urlParameters);
-
- Map headers = new HashMap<>();
- headers.put("X-Datadog-EVP-Subdomain", "webhook-intake");
- headers.put("DD-CI-PROVIDER-NAME", "jenkins");
-
- for (Payload span : spans) {
- if (span.getTrack() != Track.WEBHOOK) {
- logger.severe("Expected webhook track, got " + span.getTrack() + ", dropping span");
- continue;
- }
-
- byte[] body = span.getJson().toString().getBytes(StandardCharsets.UTF_8);
- if (body.length > PAYLOAD_SIZE_LIMIT) {
- logger.severe("Dropping span because payload size (" + body.length + ") exceeds the allowed limit of " + PAYLOAD_SIZE_LIMIT);
- continue;
- }
-
- // webhook intake does not support batch requests
- logger.fine("Sending webhook");
- client.postAsynchronously(url, headers, "application/json", body);
- }
- }
-
private void sendSpansToApm(Collection spans) {
try {
Map tracesById = new HashMap<>();
diff --git a/src/main/java/org/datadog/jenkins/plugins/datadog/clients/DatadogApiClient.java b/src/main/java/org/datadog/jenkins/plugins/datadog/clients/DatadogApiClient.java
index 129e2a7d..71ce4325 100644
--- a/src/main/java/org/datadog/jenkins/plugins/datadog/clients/DatadogApiClient.java
+++ b/src/main/java/org/datadog/jenkins/plugins/datadog/clients/DatadogApiClient.java
@@ -25,7 +25,15 @@ of this software and associated documentation files (the "Software"), to deal
package org.datadog.jenkins.plugins.datadog.clients;
+import static org.datadog.jenkins.plugins.datadog.traces.write.TraceWriteStrategy.ENABLE_TRACES_BATCHING_ENV_VAR;
+
import hudson.util.Secret;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.logging.Logger;
import net.sf.json.JSON;
import net.sf.json.JSONArray;
import net.sf.json.JSONObject;
@@ -44,15 +52,6 @@ of this software and associated documentation files (the "Software"), to deal
import org.datadog.jenkins.plugins.datadog.util.SuppressFBWarnings;
import org.datadog.jenkins.plugins.datadog.util.TagsUtil;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.*;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import java.util.logging.Logger;
-import java.util.zip.GZIPOutputStream;
-
/**
* This class is used to collect all methods that has to do with transmitting
* data to Datadog.
@@ -68,7 +67,6 @@ public class DatadogApiClient implements DatadogClient {
private static final String SERVICECHECK = "v1/check_run";
private static final String VALIDATE = "v1/validate";
-
/* Timeout of 1 minutes for connecting and reading.
* this prevents this plugin from causing jobs to hang in case of
* flaky network or Datadog being down. Left intentionally long.
@@ -159,24 +157,17 @@ public static boolean validateWebhookIntakeConnection(String webhookIntakeUrl, S
}
public static boolean validateLogIntakeConnection(String logsIntakeUrl, Secret apiKey) {
- String payload = "{\"message\":\"[datadog-plugin] Check connection\", " +
- "\"ddsource\":\"Jenkins\", \"service\":\"Jenkins\", " +
- "\"hostname\":\"" + DatadogUtilities.getHostname(null) + "\"}";
- return postLogs(new HttpClient(HTTP_TIMEOUT_MS), logsIntakeUrl, apiKey, payload);
- }
-
- private static boolean postLogs(HttpClient httpClient, String logIntakeUrl, Secret apiKey, String payload) {
- if(payload == null){
- logger.fine("No payload to post");
- return true;
- }
+ HttpClient httpClient = new HttpClient(HTTP_TIMEOUT_MS);
Map headers = new HashMap<>();
headers.put("DD-API-KEY", Secret.toString(apiKey));
+ String payload = "{\"message\":\"[datadog-plugin] Check connection\", " +
+ "\"ddsource\":\"Jenkins\", \"service\":\"Jenkins\", " +
+ "\"hostname\":\"" + DatadogUtilities.getHostname(null) + "\"}";
byte[] body = payload.getBytes(StandardCharsets.UTF_8);
try {
- httpClient.postAsynchronously(logIntakeUrl, headers, "application/json", body);
+ httpClient.post(logsIntakeUrl, headers, "application/json", body, Function.identity());
return true;
} catch (Exception e) {
DatadogUtilities.severe(logger, e, "Failed to post logs");
@@ -325,22 +316,21 @@ public LogWriteStrategy createLogWriteStrategy() {
}
private static final class ApiLogWriteStrategy implements LogWriteStrategy {
- private static final byte[] BEGIN_JSON_ARRAY = "[".getBytes(StandardCharsets.UTF_8);
- private static final byte[] END_JSON_ARRAY = "]".getBytes(StandardCharsets.UTF_8);
- private static final byte[] COMMA = ",".getBytes(StandardCharsets.UTF_8);
-
- private final String logIntakeUrl;
- private final Secret apiKey;
- private final HttpClient httpClient;
-
- private final CircuitBreaker> circuitBreaker;
+ private final CircuitBreaker> circuitBreaker;
public ApiLogWriteStrategy(String logIntakeUrl, Secret apiKey, HttpClient httpClient) {
- this.logIntakeUrl = logIntakeUrl;
- this.apiKey = apiKey;
- this.httpClient = httpClient;
+ Map headers = Map.of(
+ "DD-API-KEY", Secret.toString(apiKey),
+ "Content-Encoding", "gzip");
+ JsonPayloadSender payloadSender = new CompressedBatchSender<>(
+ httpClient,
+ logIntakeUrl,
+ headers,
+ PAYLOAD_SIZE_LIMIT,
+ Function.identity());
+
this.circuitBreaker = new CircuitBreaker<>(
- this::doSend,
+ payloadSender::send,
this::fallback,
this::handleError,
100,
@@ -349,51 +339,15 @@ public ApiLogWriteStrategy(String logIntakeUrl, Secret apiKey, HttpClient httpCl
}
@Override
- public void send(List logs) {
+ public void send(List logs) {
circuitBreaker.accept(logs);
}
- private void doSend(List payloads) throws Exception {
- Map headers = new HashMap<>();
- headers.put("DD-API-KEY", Secret.toString(apiKey));
- headers.put("Content-Encoding", "gzip");
-
- ByteArrayOutputStream request = new ByteArrayOutputStream();
- GZIPOutputStream gzip = new GZIPOutputStream(request);
- // the backend checks the size limit against the uncompressed body of the request
- int uncompressedRequestLength = 0;
-
- for (String payload : payloads) {
- byte[] body = payload.getBytes(StandardCharsets.UTF_8);
- if (body.length + 2 > PAYLOAD_SIZE_LIMIT) { // + 2 is for array beginning and end: []
- logger.severe("Dropping a log because payload size (" + body.length + ") exceeds the allowed limit of " + PAYLOAD_SIZE_LIMIT);
- continue;
- }
-
- if (uncompressedRequestLength + body.length + 2 > PAYLOAD_SIZE_LIMIT) { // + 2 is for comma and array end: ,]
- gzip.write(END_JSON_ARRAY);
- gzip.close();
- httpClient.post(logIntakeUrl, headers, "application/json", request.toByteArray(), Function.identity());
- request = new ByteArrayOutputStream();
- gzip = new GZIPOutputStream(request);
- uncompressedRequestLength = 0;
- }
-
- gzip.write(uncompressedRequestLength == 0 ? BEGIN_JSON_ARRAY : COMMA);
- gzip.write(body);
- uncompressedRequestLength += body.length + 1;
- }
-
- gzip.write(END_JSON_ARRAY);
- gzip.close();
- httpClient.post(logIntakeUrl, headers, "application/json", request.toByteArray(), Function.identity());
- }
-
private void handleError(Exception e) {
DatadogUtilities.severe(logger, e, "Failed to post logs");
}
- private void fallback(List payloads) {
+ private void fallback(List payloads) {
// cannot establish connection to API, do nothing
}
@@ -405,34 +359,26 @@ public void close() {
@Override
public TraceWriteStrategy createTraceWriteStrategy() {
- return new TraceWriteStrategyImpl(Track.WEBHOOK, this::sendSpans);
- }
-
- private void sendSpans(Collection spans) {
DatadogGlobalConfiguration datadogGlobalDescriptor = DatadogUtilities.getDatadogGlobalDescriptor();
String urlParameters = datadogGlobalDescriptor != null ? "?service=" + datadogGlobalDescriptor.getCiInstanceName() : "";
String url = webhookIntakeUrl + urlParameters;
- Map headers = new HashMap<>();
- headers.put("DD-API-KEY", Secret.toString(apiKey));
- headers.put("DD-CI-PROVIDER-NAME", "jenkins");
-
- for (Payload span : spans) {
- if (span.getTrack() != Track.WEBHOOK) {
- logger.severe("Expected webhook track, got " + span.getTrack() + ", dropping span");
- continue;
- }
-
- byte[] body = span.getJson().toString().getBytes(StandardCharsets.UTF_8);
- if (body.length > PAYLOAD_SIZE_LIMIT) {
- logger.severe("Dropping span because payload size (" + body.length + ") exceeds the allowed limit of " + PAYLOAD_SIZE_LIMIT);
- continue;
- }
-
- // webhook intake does not support batch requests
- logger.fine("Sending webhook");
- httpClient.postAsynchronously(url, headers, "application/json", body);
+ // TODO use CompressedBatchSender unconditionally in the next release
+ JsonPayloadSender payloadSender;
+ if (DatadogUtilities.envVar(ENABLE_TRACES_BATCHING_ENV_VAR, false)) {
+ Map headers = Map.of(
+ "DD-API-KEY", Secret.toString(apiKey),
+ "DD-CI-PROVIDER-NAME", "jenkins",
+ "Content-Encoding", "gzip");
+ payloadSender = new CompressedBatchSender<>(httpClient, url, headers, PAYLOAD_SIZE_LIMIT, p -> p.getJson());
+ } else {
+ Map headers = Map.of(
+ "DD-API-KEY", Secret.toString(apiKey),
+ "DD-CI-PROVIDER-NAME", "jenkins");
+ payloadSender = new SimpleSender<>(httpClient, url, headers, p -> p.getJson());
}
+
+ return new TraceWriteStrategyImpl(Track.WEBHOOK, payloadSender::send);
}
@Override
diff --git a/src/main/java/org/datadog/jenkins/plugins/datadog/clients/JsonPayloadSender.java b/src/main/java/org/datadog/jenkins/plugins/datadog/clients/JsonPayloadSender.java
new file mode 100644
index 00000000..3d2f412d
--- /dev/null
+++ b/src/main/java/org/datadog/jenkins/plugins/datadog/clients/JsonPayloadSender.java
@@ -0,0 +1,7 @@
+package org.datadog.jenkins.plugins.datadog.clients;
+
+import java.util.Collection;
+
+public interface JsonPayloadSender {
+ void send(Collection payloads) throws Exception;
+}
diff --git a/src/main/java/org/datadog/jenkins/plugins/datadog/clients/SimpleSender.java b/src/main/java/org/datadog/jenkins/plugins/datadog/clients/SimpleSender.java
new file mode 100644
index 00000000..1b168e3f
--- /dev/null
+++ b/src/main/java/org/datadog/jenkins/plugins/datadog/clients/SimpleSender.java
@@ -0,0 +1,34 @@
+package org.datadog.jenkins.plugins.datadog.clients;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.Map;
+import java.util.function.Function;
+import net.sf.json.JSONObject;
+
+public class SimpleSender implements JsonPayloadSender {
+
+ private final HttpClient httpClient;
+ private final String url;
+ private final Map headers;
+ private final Function payloadToJson;
+
+ public SimpleSender(HttpClient httpClient,
+ String url,
+ Map headers,
+ Function payloadToJson) {
+ this.httpClient = httpClient;
+ this.url = url;
+ this.headers = headers;
+ this.payloadToJson = payloadToJson;
+ }
+
+ @Override
+ public void send(Collection payloads) throws Exception {
+ for (T payload : payloads) {
+ JSONObject json = payloadToJson.apply(payload);
+ byte[] body = json.toString().getBytes(StandardCharsets.UTF_8);
+ httpClient.postAsynchronously(url, headers, "application/json", body);
+ }
+ }
+}
diff --git a/src/main/java/org/datadog/jenkins/plugins/datadog/flare/ConnectivityChecksFlare.java b/src/main/java/org/datadog/jenkins/plugins/datadog/flare/ConnectivityChecksFlare.java
index 7e455670..f607c6dc 100644
--- a/src/main/java/org/datadog/jenkins/plugins/datadog/flare/ConnectivityChecksFlare.java
+++ b/src/main/java/org/datadog/jenkins/plugins/datadog/flare/ConnectivityChecksFlare.java
@@ -17,7 +17,7 @@ public class ConnectivityChecksFlare implements FlareContributor {
@Override
public int order() {
- return 2;
+ return ORDER.CONNECTIVITY_CHECKS;
}
@Override
diff --git a/src/main/java/org/datadog/jenkins/plugins/datadog/flare/DatadogConfigFlare.java b/src/main/java/org/datadog/jenkins/plugins/datadog/flare/DatadogConfigFlare.java
index f2a13911..e941ba25 100644
--- a/src/main/java/org/datadog/jenkins/plugins/datadog/flare/DatadogConfigFlare.java
+++ b/src/main/java/org/datadog/jenkins/plugins/datadog/flare/DatadogConfigFlare.java
@@ -21,7 +21,7 @@ public class DatadogConfigFlare implements FlareContributor {
@Override
public int order() {
- return 1;
+ return ORDER.CONFIG;
}
@Override
diff --git a/src/main/java/org/datadog/jenkins/plugins/datadog/flare/DatadogEnvVarsFlare.java b/src/main/java/org/datadog/jenkins/plugins/datadog/flare/DatadogEnvVarsFlare.java
index e6582727..e7079d9c 100644
--- a/src/main/java/org/datadog/jenkins/plugins/datadog/flare/DatadogEnvVarsFlare.java
+++ b/src/main/java/org/datadog/jenkins/plugins/datadog/flare/DatadogEnvVarsFlare.java
@@ -12,7 +12,7 @@ public class DatadogEnvVarsFlare implements FlareContributor {
@Override
public int order() {
- return 3;
+ return ORDER.ENV_VARS;
}
@Override
diff --git a/src/main/java/org/datadog/jenkins/plugins/datadog/flare/ExceptionsFlare.java b/src/main/java/org/datadog/jenkins/plugins/datadog/flare/ExceptionsFlare.java
index 372fe566..0bab729c 100644
--- a/src/main/java/org/datadog/jenkins/plugins/datadog/flare/ExceptionsFlare.java
+++ b/src/main/java/org/datadog/jenkins/plugins/datadog/flare/ExceptionsFlare.java
@@ -18,7 +18,7 @@ public class ExceptionsFlare implements FlareContributor {
@Override
public int order() {
- return 4;
+ return ORDER.EXCEPTIONS;
}
@Override
diff --git a/src/main/java/org/datadog/jenkins/plugins/datadog/flare/FlareContributor.java b/src/main/java/org/datadog/jenkins/plugins/datadog/flare/FlareContributor.java
index fa10fe53..0accfe31 100644
--- a/src/main/java/org/datadog/jenkins/plugins/datadog/flare/FlareContributor.java
+++ b/src/main/java/org/datadog/jenkins/plugins/datadog/flare/FlareContributor.java
@@ -4,6 +4,19 @@
public interface FlareContributor {
+ interface ORDER {
+ int RUNTIME_INFO = 0;
+ int CONFIG = 1;
+ int CONNECTIVITY_CHECKS = 2;
+ int ENV_VARS = 3;
+ int WRITERS_HEALTH = 4;
+ int EXCEPTIONS = 5;
+ int PLUGIN_LOGS = 6;
+ int JENKINS_LOGS = 7;
+ int THREAD_DUMP = 8;
+ int JFR = 9;
+ }
+
default boolean isEnabledByDefault() {
return true;
}
diff --git a/src/main/java/org/datadog/jenkins/plugins/datadog/flare/JenkinsLogsFlare.java b/src/main/java/org/datadog/jenkins/plugins/datadog/flare/JenkinsLogsFlare.java
index 1130f176..087ee8e4 100644
--- a/src/main/java/org/datadog/jenkins/plugins/datadog/flare/JenkinsLogsFlare.java
+++ b/src/main/java/org/datadog/jenkins/plugins/datadog/flare/JenkinsLogsFlare.java
@@ -25,7 +25,7 @@ public class JenkinsLogsFlare implements FlareContributor {
@Override
public int order() {
- return 6;
+ return ORDER.JENKINS_LOGS;
}
@Override
diff --git a/src/main/java/org/datadog/jenkins/plugins/datadog/flare/JfrFlare.java b/src/main/java/org/datadog/jenkins/plugins/datadog/flare/JfrFlare.java
index be6d4704..aa8a589f 100644
--- a/src/main/java/org/datadog/jenkins/plugins/datadog/flare/JfrFlare.java
+++ b/src/main/java/org/datadog/jenkins/plugins/datadog/flare/JfrFlare.java
@@ -21,7 +21,7 @@ public boolean isEnabledByDefault() {
@Override
public int order() {
- return 8;
+ return ORDER.JFR;
}
@Override
diff --git a/src/main/java/org/datadog/jenkins/plugins/datadog/flare/PluginLogsFlare.java b/src/main/java/org/datadog/jenkins/plugins/datadog/flare/PluginLogsFlare.java
index 0b91d918..208af141 100644
--- a/src/main/java/org/datadog/jenkins/plugins/datadog/flare/PluginLogsFlare.java
+++ b/src/main/java/org/datadog/jenkins/plugins/datadog/flare/PluginLogsFlare.java
@@ -56,7 +56,7 @@ public void onStartup() {
@Override
public int order() {
- return 5;
+ return ORDER.PLUGIN_LOGS;
}
@Override
diff --git a/src/main/java/org/datadog/jenkins/plugins/datadog/flare/RuntimeInfoFlare.java b/src/main/java/org/datadog/jenkins/plugins/datadog/flare/RuntimeInfoFlare.java
index 043eb833..bfcca612 100644
--- a/src/main/java/org/datadog/jenkins/plugins/datadog/flare/RuntimeInfoFlare.java
+++ b/src/main/java/org/datadog/jenkins/plugins/datadog/flare/RuntimeInfoFlare.java
@@ -15,7 +15,7 @@ public class RuntimeInfoFlare implements FlareContributor {
@Override
public int order() {
- return 0;
+ return ORDER.RUNTIME_INFO;
}
@Override
diff --git a/src/main/java/org/datadog/jenkins/plugins/datadog/flare/ThreadDumpFlare.java b/src/main/java/org/datadog/jenkins/plugins/datadog/flare/ThreadDumpFlare.java
index 4ca9167d..74206ac0 100644
--- a/src/main/java/org/datadog/jenkins/plugins/datadog/flare/ThreadDumpFlare.java
+++ b/src/main/java/org/datadog/jenkins/plugins/datadog/flare/ThreadDumpFlare.java
@@ -12,7 +12,7 @@ public class ThreadDumpFlare implements FlareContributor {
@Override
public int order() {
- return 7;
+ return ORDER.THREAD_DUMP;
}
@Override
diff --git a/src/main/java/org/datadog/jenkins/plugins/datadog/flare/WritersHealthFlare.java b/src/main/java/org/datadog/jenkins/plugins/datadog/flare/WritersHealthFlare.java
new file mode 100644
index 00000000..bbce132f
--- /dev/null
+++ b/src/main/java/org/datadog/jenkins/plugins/datadog/flare/WritersHealthFlare.java
@@ -0,0 +1,44 @@
+package org.datadog.jenkins.plugins.datadog.flare;
+
+import com.codahale.metrics.json.MetricsModule;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import hudson.Extension;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.TimeUnit;
+import org.datadog.jenkins.plugins.datadog.util.AsyncWriter;
+
+@Extension
+public class WritersHealthFlare implements FlareContributor {
+
+ private final ObjectMapper objectMapper;
+
+ public WritersHealthFlare() {
+ objectMapper = new ObjectMapper()
+ .registerModule(new MetricsModule(TimeUnit.SECONDS, TimeUnit.MILLISECONDS, false))
+ .configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false)
+ .enable(SerializationFeature.INDENT_OUTPUT);
+ }
+
+ @Override
+ public int order() {
+ return ORDER.WRITERS_HEALTH;
+ }
+
+ @Override
+ public String getDescription() {
+ return "Health stats for async writers (traces, logs)";
+ }
+
+ @Override
+ public String getFilename() {
+ return "writers-health.json";
+ }
+
+ @Override
+ public void writeFileContents(OutputStream out) throws IOException {
+ objectMapper.writeValue(out, AsyncWriter.METRICS);
+ }
+}
diff --git a/src/main/java/org/datadog/jenkins/plugins/datadog/logs/DatadogWriter.java b/src/main/java/org/datadog/jenkins/plugins/datadog/logs/DatadogWriter.java
index d96ea384..6794e6e0 100644
--- a/src/main/java/org/datadog/jenkins/plugins/datadog/logs/DatadogWriter.java
+++ b/src/main/java/org/datadog/jenkins/plugins/datadog/logs/DatadogWriter.java
@@ -72,9 +72,9 @@ public void write(String line) {
payload.put("timestamp", System.currentTimeMillis());
payload.put(PipelineStepData.StepType.PIPELINE.getTagName() + CITags._NAME, this.buildData.getJobName());
- AsyncWriter logWriter = LogWriterFactory.getLogWriter();
+ AsyncWriter logWriter = LogWriterFactory.getLogWriter();
if (logWriter != null) {
- logWriter.submit(payload.toString());
+ logWriter.submit(payload);
}
} catch (Exception e) {
diff --git a/src/main/java/org/datadog/jenkins/plugins/datadog/logs/LogWriteStrategy.java b/src/main/java/org/datadog/jenkins/plugins/datadog/logs/LogWriteStrategy.java
index 70d17751..771e2bf4 100644
--- a/src/main/java/org/datadog/jenkins/plugins/datadog/logs/LogWriteStrategy.java
+++ b/src/main/java/org/datadog/jenkins/plugins/datadog/logs/LogWriteStrategy.java
@@ -1,12 +1,13 @@
package org.datadog.jenkins.plugins.datadog.logs;
+import net.sf.json.JSONObject;
import java.util.List;
public interface LogWriteStrategy {
LogWriteStrategy NO_OP = new LogWriteStrategy() {
@Override
- public void send(List logs) {
+ public void send(List logs) {
// no op
}
@@ -16,6 +17,6 @@ public void close() {
}
};
- void send(List logs);
+ void send(List logs);
void close();
}
diff --git a/src/main/java/org/datadog/jenkins/plugins/datadog/logs/LogWriterFactory.java b/src/main/java/org/datadog/jenkins/plugins/datadog/logs/LogWriterFactory.java
index 20978956..7cb9aa46 100644
--- a/src/main/java/org/datadog/jenkins/plugins/datadog/logs/LogWriterFactory.java
+++ b/src/main/java/org/datadog/jenkins/plugins/datadog/logs/LogWriterFactory.java
@@ -1,6 +1,7 @@
package org.datadog.jenkins.plugins.datadog.logs;
import hudson.init.Terminator;
+import net.sf.json.JSONObject;
import org.datadog.jenkins.plugins.datadog.util.AsyncWriter;
import org.datadog.jenkins.plugins.datadog.DatadogClient;
import org.datadog.jenkins.plugins.datadog.DatadogUtilities;
@@ -15,13 +16,13 @@ public class LogWriterFactory {
private static final String POLLING_TIMEOUT_ENV_VAR = "DD_JENKINS_LOGS_POLLING_TIMEOUT_SECONDS";
private static final String BATCH_SIZE_LIMIT_ENV_VAR = "DD_JENKINS_LOGS_BATCH_SIZE_LIMIT";
- private static final int DEFAULT_QUEUE_CAPACITY = 5_000;
- private static final int DEFAULT_SUBMIT_TIMEOUT_SECONDS = 5;
+ private static final int DEFAULT_QUEUE_CAPACITY = 10_000;
+ private static final int DEFAULT_SUBMIT_TIMEOUT_SECONDS = 0;
private static final int DEFAULT_STOP_TIMEOUT_SECONDS = 10;
private static final int DEFAULT_POLLING_TIMEOUT_SECONDS = 2;
- private static final int DEFAULT_BATCH_SIZE_LIMIT = 100;
+ private static final int DEFAULT_BATCH_SIZE_LIMIT = 500;
- private static volatile AsyncWriter LOG_WRITER;
+ private static volatile AsyncWriter LOG_WRITER;
public static synchronized void onDatadogClientUpdate(@Nullable DatadogClient client) {
if (client == null) {
@@ -57,7 +58,7 @@ public static synchronized void stop() throws InterruptedException {
}
@Nullable
- public static AsyncWriter getLogWriter() {
+ public static AsyncWriter getLogWriter() {
return LOG_WRITER;
}
}
diff --git a/src/main/java/org/datadog/jenkins/plugins/datadog/traces/write/TraceWriteStrategy.java b/src/main/java/org/datadog/jenkins/plugins/datadog/traces/write/TraceWriteStrategy.java
index 9d3143c3..99668d55 100644
--- a/src/main/java/org/datadog/jenkins/plugins/datadog/traces/write/TraceWriteStrategy.java
+++ b/src/main/java/org/datadog/jenkins/plugins/datadog/traces/write/TraceWriteStrategy.java
@@ -9,6 +9,9 @@
import java.util.Collection;
public interface TraceWriteStrategy {
+
+ String ENABLE_TRACES_BATCHING_ENV_VAR = "DD_JENKINS_ENABLE_TRACES_BATCHING";
+
@Nullable
Payload serialize(BuildData buildData, Run, ?> run);
diff --git a/src/main/java/org/datadog/jenkins/plugins/datadog/traces/write/TraceWriter.java b/src/main/java/org/datadog/jenkins/plugins/datadog/traces/write/TraceWriter.java
index df77e523..478f46fc 100644
--- a/src/main/java/org/datadog/jenkins/plugins/datadog/traces/write/TraceWriter.java
+++ b/src/main/java/org/datadog/jenkins/plugins/datadog/traces/write/TraceWriter.java
@@ -18,10 +18,10 @@ public final class TraceWriter {
private static final String POLLING_TIMEOUT_ENV_VAR = "DD_JENKINS_TRACES_POLLING_TIMEOUT_SECONDS";
private static final String BATCH_SIZE_LIMIT_ENV_VAR = "DD_JENKINS_TRACES_BATCH_SIZE_LIMIT";
private static final int DEFAULT_QUEUE_CAPACITY = 10_000;
- private static final int DEFAULT_SUBMIT_TIMEOUT_SECONDS = 30;
+ private static final int DEFAULT_SUBMIT_TIMEOUT_SECONDS = 0;
private static final int DEFAULT_STOP_TIMEOUT_SECONDS = 10;
private static final int DEFAULT_POLLING_TIMEOUT_SECONDS = 5;
- private static final int DEFAULT_BATCH_SIZE_LIMIT = 100;
+ private static final int DEFAULT_BATCH_SIZE_LIMIT = 500;
private final TraceWriteStrategy traceWriteStrategy;
private final AsyncWriter asyncWriter;
diff --git a/src/main/java/org/datadog/jenkins/plugins/datadog/util/AsyncWriter.java b/src/main/java/org/datadog/jenkins/plugins/datadog/util/AsyncWriter.java
index 4faae808..498fc4a2 100644
--- a/src/main/java/org/datadog/jenkins/plugins/datadog/util/AsyncWriter.java
+++ b/src/main/java/org/datadog/jenkins/plugins/datadog/util/AsyncWriter.java
@@ -1,8 +1,10 @@
package org.datadog.jenkins.plugins.datadog.util;
-import org.datadog.jenkins.plugins.datadog.DatadogUtilities;
-
-import javax.annotation.Nullable;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
@@ -10,10 +12,15 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
+import java.util.logging.Level;
import java.util.logging.Logger;
+import javax.annotation.Nullable;
+import org.datadog.jenkins.plugins.datadog.DatadogUtilities;
public final class AsyncWriter {
+ public static final MetricRegistry METRICS = new MetricRegistry();
+
private static final Logger logger = Logger.getLogger(AsyncWriter.class.getName());
private final Consumer> queueConsumer;
@@ -27,6 +34,12 @@ public final class AsyncWriter {
private final int stopTimeoutSeconds;
private final int batchSizeLimit;
+ private final Timer submit;
+ private final Meter submitDropped;
+ private final Timer dispatch;
+ private final Gauge queueSize;
+ private final Histogram batchSize;
+
public AsyncWriter(String name,
Consumer> queueConsumer,
Runnable onStop,
@@ -44,6 +57,11 @@ public AsyncWriter(String name,
this.pollingTimeoutSeconds = pollingTimeoutSeconds;
this.stopTimeoutSeconds = stopTimeoutSeconds;
this.batchSizeLimit = batchSizeLimit;
+ this.submit = METRICS.timer(name + ".submit");
+ this.submitDropped = METRICS.meter(name + ".submit.dropped");
+ this.dispatch = METRICS.timer(name + ".dispatch");
+ this.queueSize = METRICS.gauge(name + ".queue.size", () -> queue::size);
+ this.batchSize = METRICS.histogram(name + ".batch.size");
}
public void start() {
@@ -62,8 +80,16 @@ public void stopSynchronously() throws InterruptedException {
}
public void submit(@Nullable T element) throws InterruptedException, TimeoutException {
- if (element != null && !queue.offer(element, submitTimeoutSeconds, TimeUnit.SECONDS)) {
- throw new TimeoutException("Timed out while submitting span: " + name);
+ if (element == null) {
+ return;
+ }
+ try (Timer.Context submitTime = submit.time()) {
+ if (!queue.offer(element, submitTimeoutSeconds, TimeUnit.SECONDS)) {
+ submitDropped.mark();
+ if (logger.isLoggable(Level.FINE)) {
+ logger.log(Level.FINE, "Timed out while doing async submit: " + name);
+ }
+ }
}
}
@@ -77,11 +103,14 @@ private void runPollingLoop() {
continue;
}
- List elements = new ArrayList<>(batchSizeLimit);
- elements.add(element);
- queue.drainTo(elements, batchSizeLimit - 1);
+ try (Timer.Context dispatchTime = dispatch.time()) {
+ List elements = new ArrayList<>(batchSizeLimit);
+ elements.add(element);
+ queue.drainTo(elements, batchSizeLimit - 1);
+ queueConsumer.accept(elements);
- queueConsumer.accept(elements);
+ batchSize.update(elements.size());
+ }
} catch (InterruptedException e) {
logger.info("Queue poller thread interrupted: " + name);
@@ -93,4 +122,5 @@ private void runPollingLoop() {
}
logger.info("Queue polling stopped, elements not flushed " + queue.size() + ": " + name);
}
+
}
diff --git a/src/test/java/org/datadog/jenkins/plugins/datadog/clients/CompressedBatchSenderTest.java b/src/test/java/org/datadog/jenkins/plugins/datadog/clients/CompressedBatchSenderTest.java
new file mode 100644
index 00000000..2ff3f3e4
--- /dev/null
+++ b/src/test/java/org/datadog/jenkins/plugins/datadog/clients/CompressedBatchSenderTest.java
@@ -0,0 +1,131 @@
+package org.datadog.jenkins.plugins.datadog.clients;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import java.io.ByteArrayInputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.zip.GZIPInputStream;
+import net.sf.json.JSONArray;
+import net.sf.json.JSONObject;
+import org.apache.commons.io.IOUtils;
+import org.jetbrains.annotations.NotNull;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+public class CompressedBatchSenderTest {
+
+ private static final String URL = "dummyUrl";
+
+ public static final Map HEADERS;
+ static {
+ HEADERS = new HashMap<>();
+ HEADERS.put("header1", "value1");
+ HEADERS.put("header2", "value2");
+ }
+
+ private static final int BATCH_SIZE = 30;
+
+ private final HttpClient httpClient = mock(HttpClient.class);
+
+ private final CompressedBatchSender