Skip to content

Commit

Permalink
Implement batch submission for traces (#469)
Browse files Browse the repository at this point in the history
  • Loading branch information
nikita-tkachenko-datadog authored Nov 20, 2024
1 parent 043765b commit 4687cf3
Show file tree
Hide file tree
Showing 29 changed files with 532 additions and 177 deletions.
22 changes: 22 additions & 0 deletions docker/controller-node/60-add-script-signature-approvals.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import org.jenkinsci.plugins.scriptsecurity.scripts.ScriptApproval

// needed by the stress-test-traces-submit pipeline
def scriptApproval = ScriptApproval.get()
def signatures = [
"field org.datadog.jenkins.plugins.datadog.traces.write.TraceWriter asyncWriter",
"field org.datadog.jenkins.plugins.datadog.util.AsyncWriter queue",
"method java.util.concurrent.BlockingQueue remainingCapacity",
"new java.util.concurrent.atomic.AtomicBoolean boolean",
"method java.util.concurrent.atomic.AtomicBoolean get",
"method java.util.concurrent.atomic.AtomicBoolean set boolean",
"staticMethod org.datadog.jenkins.plugins.datadog.traces.write.TraceWriterFactory getTraceWriter",
]

signatures.each { signature ->
if (!scriptApproval.getPendingSignatures().any { it.signature == signature }) {
scriptApproval.approveSignature(signature)
println "Approved signature: $signature"
} else {
println "Signature already pending or approved: $signature"
}
}
1 change: 1 addition & 0 deletions docker/controller-node/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ COPY 30-create-github-credentials.groovy /usr/share/jenkins/ref/init.groovy.d/30
COPY 40-create-datadog-key_credentials.groovy /usr/share/jenkins/ref/init.groovy.d/40-create-datadog-key_credentials.groovy
COPY 50-create-jobs.groovy /usr/share/jenkins/ref/init.groovy.d/50-create-jobs.groovy
COPY 51-create-multi-branch-pipeline.groovy /usr/share/jenkins/ref/init.groovy.d/51-create-multi-branch-pipeline.groovy
COPY 60-add-script-signature-approvals.groovy /usr/share/jenkins/ref/init.groovy.d/60-add-script-signature-approvals.groovy

RUN /var/jenkins_home/add-github-to-known-hosts.sh

Expand Down
51 changes: 51 additions & 0 deletions docker/controller-node/jobs/stress-test-traces-submit.cps
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import java.util.concurrent.atomic.AtomicBoolean

node {
def numBuilds = params.NUM_BUILDS ?: 10
def numSteps = params.NUM_STEPS ?: 100
def jobComplete = new AtomicBoolean(false)
def parallelStages = [:]
parallelStages['Generate jobs'] = {
stage('Generate jobs') {
def builds = [:]
// Loop to create parallel jobs
for (int i = 1; i <= numBuilds; i++) {
def jobIndex = i
builds["Job-${jobIndex}"] = {
echo "Starting Job ${jobIndex}"
// Inner loop to create steps within each job
for (int j = 1; j <= numSteps; j++) {
echo "Executing step ${j} in Job ${jobIndex}"
// Execute a shell command to echo random characters
sh "echo ${UUID.randomUUID()}"
}
echo "Finished Load Job ${jobIndex}"
}
}
// Execute all jobs in parallel
parallel builds
jobComplete.set(true)
}
}
parallelStages['Print traces queue capacity'] = {
stage('Print traces queue capacity') {
script {
waitUntil {
echo "Remaining traces queue capacity ${org.datadog.jenkins.plugins.datadog.traces.write.TraceWriterFactory.getTraceWriter().asyncWriter.queue.remainingCapacity()}"
sleep time: 1, unit: 'SECONDS'
return jobComplete.get()
}
}
}
}
parallel parallelStages
}
10 changes: 10 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,16 @@
<artifactId>access-modifier-suppressions</artifactId>
<version>${access-modifier-checker.version}</version>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>4.2.28</version>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-json</artifactId>
<version>4.2.28</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1237,24 +1237,24 @@ public static String getDatadogPluginVersion() {
}

public static int envVar(String name, int defaultValue) {
String value = System.getenv(name);
if (value != null) {
try {
return Integer.parseInt(value);
} catch (Exception e) {
DatadogUtilities.severe(logger, null, "Invalid value " + value + " provided for env var " + name + ": integer number expected");
}
}
return defaultValue;
return envVar(name, Integer::parseInt, defaultValue);
}

public static double envVar(String name, double defaultValue) {
return envVar(name, Double::parseDouble, defaultValue);
}

public static boolean envVar(String name, boolean defaultValue) {
return envVar(name, Boolean::parseBoolean, defaultValue);
}

public static <T> T envVar(String name, Function<String, T> parser, T defaultValue) {
String value = System.getenv(name);
if (value != null) {
try {
return Double.parseDouble(value);
return parser.apply(value);
} catch (Exception e) {
DatadogUtilities.severe(logger, null, "Invalid value " + value + " provided for env var " + name + ": floating point number expected");
DatadogUtilities.severe(logger, null, "Invalid value " + value + " provided for env var " + name);
}
}
return defaultValue;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package org.datadog.jenkins.plugins.datadog.clients;

import net.sf.json.JSONObject;
import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Map;
import java.util.function.Function;
import java.util.logging.Logger;
import java.util.zip.GZIPOutputStream;

public class CompressedBatchSender<T> implements JsonPayloadSender<T> {

private static final Logger logger = Logger.getLogger(CompressedBatchSender.class.getName());

private static final byte[] BEGIN_JSON_ARRAY = "[".getBytes(StandardCharsets.UTF_8);
private static final byte[] END_JSON_ARRAY = "]".getBytes(StandardCharsets.UTF_8);
private static final byte[] COMMA = ",".getBytes(StandardCharsets.UTF_8);

private final HttpClient httpClient;
private final String url;
private final Map<String, String> headers;
private final int batchLimitBytes;
private final Function<T, JSONObject> payloadToJson;

public CompressedBatchSender(HttpClient httpClient,
String url,
Map<String, String> headers,
int batchLimitBytes,
Function<T, JSONObject> payloadToJson) {
this.httpClient = httpClient;
this.url = url;
this.headers = headers;
this.batchLimitBytes = batchLimitBytes;
this.payloadToJson = payloadToJson;
}

@Override
public void send(Collection<T> payloads) throws Exception {
ByteArrayOutputStream request = new ByteArrayOutputStream();
GZIPOutputStream gzip = new GZIPOutputStream(request);
// the backend checks the size limit against the uncompressed body of the request
int uncompressedRequestLength = 0;

for (T payload : payloads) {
JSONObject json = payloadToJson.apply(payload);
byte[] body = json.toString().getBytes(StandardCharsets.UTF_8);
if (body.length + 2 > batchLimitBytes) { // + 2 is for array beginning and end: [<payload>]
logger.severe("Dropping a payload because size (" + body.length + ") exceeds the allowed limit of " + batchLimitBytes);
continue;
}

if (uncompressedRequestLength + body.length + 2 > batchLimitBytes) { // + 2 is for comma and array end: ,<payload>]
gzip.write(END_JSON_ARRAY);
gzip.close();
httpClient.post(url, headers, "application/json", request.toByteArray(), Function.identity());
request = new ByteArrayOutputStream();
gzip = new GZIPOutputStream(request);
uncompressedRequestLength = 0;
}

gzip.write(uncompressedRequestLength == 0 ? BEGIN_JSON_ARRAY : COMMA);
gzip.write(body);
uncompressedRequestLength += body.length + 1;
}

gzip.write(END_JSON_ARRAY);
gzip.close();
httpClient.post(url, headers, "application/json", request.toByteArray(), Function.identity());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ of this software and associated documentation files (the "Software"), to deal

package org.datadog.jenkins.plugins.datadog.clients;

import static org.datadog.jenkins.plugins.datadog.traces.write.TraceWriteStrategy.ENABLE_TRACES_BATCHING_ENV_VAR;

import com.timgroup.statsd.Event;
import com.timgroup.statsd.NonBlockingStatsDClient;
import com.timgroup.statsd.ServiceCheck;
Expand Down Expand Up @@ -301,7 +303,7 @@ private static final class AgentLogWriteStrategy implements LogWriteStrategy {
private final String host;
private final int port;

private final CircuitBreaker<List<String>> circuitBreaker;
private final CircuitBreaker<List<net.sf.json.JSONObject>> circuitBreaker;

private Socket socket;
private OutputStream out;
Expand All @@ -318,17 +320,17 @@ private AgentLogWriteStrategy(String host, int port) {
CircuitBreaker.DEFAULT_DELAY_FACTOR);
}

public void send(List<String> payloads) {
public void send(List<net.sf.json.JSONObject> payloads) {
circuitBreaker.accept(payloads);
}

private void doSend(List<String> payloads) throws Exception {
private void doSend(List<net.sf.json.JSONObject> payloads) throws Exception {
if (socket == null || socket.isClosed() || !socket.isConnected()) {
socket = new Socket(host, port);
out = new BufferedOutputStream(socket.getOutputStream());
}
for (String payload : payloads) {
out.write(payload.getBytes(StandardCharsets.UTF_8));
for (net.sf.json.JSONObject payload : payloads) {
out.write(payload.toString().getBytes(StandardCharsets.UTF_8));
out.write(LINE_SEPARATOR);
}
}
Expand All @@ -338,7 +340,7 @@ private void handleError(Exception e) {
DatadogUtilities.severe(logger, e, "Could not write logs to agent");
}

private void fallback(List<String> payloads) {
private void fallback(List<net.sf.json.JSONObject> payloads) {
// cannot establish connection to agent, do nothing
}

Expand Down Expand Up @@ -368,7 +370,26 @@ private void flushSafely() throws IOException {

@Override
public TraceWriteStrategy createTraceWriteStrategy() {
TraceWriteStrategyImpl evpStrategy = new TraceWriteStrategyImpl(Track.WEBHOOK, this::sendSpansToWebhook);
DatadogGlobalConfiguration datadogGlobalDescriptor = DatadogUtilities.getDatadogGlobalDescriptor();
String urlParameters = datadogGlobalDescriptor != null ? "?service=" + datadogGlobalDescriptor.getCiInstanceName() : "";
String url = String.format("http://%s:%d/evp_proxy/v1/api/v2/webhook/%s", hostname, traceCollectionPort, urlParameters);

// TODO use CompressedBatchSender unconditionally in the next release
JsonPayloadSender<Payload> payloadSender;
if (DatadogUtilities.envVar(ENABLE_TRACES_BATCHING_ENV_VAR, false)) {
Map<String, String> headers = Map.of(
"X-Datadog-EVP-Subdomain", "webhook-intake",
"DD-CI-PROVIDER-NAME", "jenkins",
"Content-Encoding", "gzip");
payloadSender = new CompressedBatchSender<>(client, url, headers, PAYLOAD_SIZE_LIMIT, p -> p.getJson());
} else {
Map<String, String> headers = Map.of(
"X-Datadog-EVP-Subdomain", "webhook-intake",
"DD-CI-PROVIDER-NAME", "jenkins");
payloadSender = new SimpleSender<>(client, url, headers, p -> p.getJson());
}

TraceWriteStrategyImpl evpStrategy = new TraceWriteStrategyImpl(Track.WEBHOOK, payloadSender::send);
TraceWriteStrategyImpl apmStrategy = new TraceWriteStrategyImpl(Track.APM, this::sendSpansToApm);
return new AgentTraceWriteStrategy(evpStrategy, apmStrategy, this::isEvpProxySupported);
}
Expand Down Expand Up @@ -410,36 +431,6 @@ Set<String> fetchAgentSupportedEndpoints() {
}
}

/**
* Posts a given payload to the Agent EVP Proxy, so it is forwarded to the Webhook Intake.
*/
private void sendSpansToWebhook(Collection<Payload> spans) {
DatadogGlobalConfiguration datadogGlobalDescriptor = DatadogUtilities.getDatadogGlobalDescriptor();
String urlParameters = datadogGlobalDescriptor != null ? "?service=" + datadogGlobalDescriptor.getCiInstanceName() : "";
String url = String.format("http://%s:%d/evp_proxy/v1/api/v2/webhook/%s", hostname, traceCollectionPort, urlParameters);

Map<String, String> headers = new HashMap<>();
headers.put("X-Datadog-EVP-Subdomain", "webhook-intake");
headers.put("DD-CI-PROVIDER-NAME", "jenkins");

for (Payload span : spans) {
if (span.getTrack() != Track.WEBHOOK) {
logger.severe("Expected webhook track, got " + span.getTrack() + ", dropping span");
continue;
}

byte[] body = span.getJson().toString().getBytes(StandardCharsets.UTF_8);
if (body.length > PAYLOAD_SIZE_LIMIT) {
logger.severe("Dropping span because payload size (" + body.length + ") exceeds the allowed limit of " + PAYLOAD_SIZE_LIMIT);
continue;
}

// webhook intake does not support batch requests
logger.fine("Sending webhook");
client.postAsynchronously(url, headers, "application/json", body);
}
}

private void sendSpansToApm(Collection<Payload> spans) {
try {
Map<String, net.sf.json.JSONArray> tracesById = new HashMap<>();
Expand Down
Loading

0 comments on commit 4687cf3

Please sign in to comment.