Skip to content

Commit

Permalink
Merge branch 'nikita-tkachenko/ci-visibility-batching' into nikita-tk…
Browse files Browse the repository at this point in the history
…achenko/memory-consumption-optimization
  • Loading branch information
nikita-tkachenko-datadog committed Jan 26, 2024
2 parents 52371d7 + 1756c79 commit d18d872
Show file tree
Hide file tree
Showing 10 changed files with 176 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ of this software and associated documentation files (the "Software"), to deal
import java.net.Socket;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
Expand All @@ -49,14 +49,12 @@ of this software and associated documentation files (the "Software"), to deal
import org.datadog.jenkins.plugins.datadog.DatadogEvent;
import org.datadog.jenkins.plugins.datadog.DatadogGlobalConfiguration;
import org.datadog.jenkins.plugins.datadog.DatadogUtilities;
import org.datadog.jenkins.plugins.datadog.traces.DatadogTraceBuildLogic;
import org.datadog.jenkins.plugins.datadog.traces.DatadogTracePipelineLogic;
import org.datadog.jenkins.plugins.datadog.traces.DatadogWebhookBuildLogic;
import org.datadog.jenkins.plugins.datadog.traces.DatadogWebhookPipelineLogic;
import org.datadog.jenkins.plugins.datadog.traces.mapper.JsonTraceSpanMapper;
import org.datadog.jenkins.plugins.datadog.traces.write.AgentTraceWriteStrategy;
import org.datadog.jenkins.plugins.datadog.traces.write.Payload;
import org.datadog.jenkins.plugins.datadog.traces.write.TraceWriteStrategy;
import org.datadog.jenkins.plugins.datadog.traces.write.TraceWriteStrategyImpl;
import org.datadog.jenkins.plugins.datadog.traces.write.Track;
import org.datadog.jenkins.plugins.datadog.util.SuppressFBWarnings;
import org.datadog.jenkins.plugins.datadog.util.TagsUtil;
import org.json.JSONArray;
Expand Down Expand Up @@ -528,8 +526,8 @@ public boolean sendLogs(String payload) {

@Override
public TraceWriteStrategy createTraceWriteStrategy() {
TraceWriteStrategyImpl evpStrategy = new TraceWriteStrategyImpl(new DatadogWebhookBuildLogic(), new DatadogWebhookPipelineLogic(), this::sendSpansToWebhook);
TraceWriteStrategyImpl apmStrategy = new TraceWriteStrategyImpl(new DatadogTraceBuildLogic(), new DatadogTracePipelineLogic(), this::sendSpansToApm);
TraceWriteStrategyImpl evpStrategy = new TraceWriteStrategyImpl(Track.WEBHOOK, this::sendSpansToWebhook);
TraceWriteStrategyImpl apmStrategy = new TraceWriteStrategyImpl(Track.APM, this::sendSpansToApm);
return new AgentTraceWriteStrategy(evpStrategy, apmStrategy, this::isEvpProxySupported);
}

Expand All @@ -542,19 +540,7 @@ boolean isEvpProxySupported() {
/**
* Posts a given payload to the Agent EVP Proxy, so it is forwarded to the Webhook Intake.
*/
private void sendSpansToWebhook(List<net.sf.json.JSONObject> spans) {
for (net.sf.json.JSONObject span : spans) {
// webhook intake does not support batch requests
postWebhook(span.toString());
}
}

/**
* Posts a given payload to the Agent EVP Proxy, so it is forwarded to the Webhook Intake.
*/
private void postWebhook(String payload) {
logger.fine("Sending webhook");

private void sendSpansToWebhook(Collection<Payload> spans) {
DatadogGlobalConfiguration datadogGlobalDescriptor = DatadogUtilities.getDatadogGlobalDescriptor();
String urlParameters = datadogGlobalDescriptor != null ? "?service=" + datadogGlobalDescriptor.getCiInstanceName() : "";
String url = String.format("http://%s:%d/evp_proxy/v1/api/v2/webhook/%s", hostname, traceCollectionPort, urlParameters);
Expand All @@ -563,15 +549,29 @@ private void postWebhook(String payload) {
headers.put("X-Datadog-EVP-Subdomain", "webhook-intake");
headers.put("DD-CI-PROVIDER-NAME", "jenkins");

byte[] body = payload.getBytes(StandardCharsets.UTF_8);
client.postAsynchronously(url, headers, "application/json", body);
for (Payload span : spans) {
if (span.getTrack() != Track.WEBHOOK) {
logger.severe("Expected webhook track, got " + span.getTrack() + ", dropping span");
continue;
}

byte[] body = span.getJson().toString().getBytes(StandardCharsets.UTF_8);

// webhook intake does not support batch requests
logger.fine("Sending webhook");
client.postAsynchronously(url, headers, "application/json", body);
}
}

private void sendSpansToApm(List<net.sf.json.JSONObject> spans) {
private void sendSpansToApm(Collection<Payload> spans) {
try {
Map<String, net.sf.json.JSONArray> tracesById = new HashMap<>();
for (net.sf.json.JSONObject span : spans) {
tracesById.computeIfAbsent(span.getString(JsonTraceSpanMapper.TRACE_ID), k -> new net.sf.json.JSONArray()).add(span);
for (Payload span : spans) {
if (span.getTrack() != Track.APM) {
logger.severe("Expected APM track, got " + span.getTrack() + ", dropping span");
continue;
}
tracesById.computeIfAbsent(span.getJson().getString(JsonTraceSpanMapper.TRACE_ID), k -> new net.sf.json.JSONArray()).add(span.getJson());
}

final JSONArray jsonTraces = new JSONArray();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ of this software and associated documentation files (the "Software"), to deal

package org.datadog.jenkins.plugins.datadog.clients;

import com.google.common.base.Objects;
import hudson.util.Secret;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
Expand All @@ -45,10 +46,10 @@ of this software and associated documentation files (the "Software"), to deal
import org.datadog.jenkins.plugins.datadog.DatadogEvent;
import org.datadog.jenkins.plugins.datadog.DatadogGlobalConfiguration;
import org.datadog.jenkins.plugins.datadog.DatadogUtilities;
import org.datadog.jenkins.plugins.datadog.traces.DatadogWebhookBuildLogic;
import org.datadog.jenkins.plugins.datadog.traces.DatadogWebhookPipelineLogic;
import org.datadog.jenkins.plugins.datadog.traces.write.Payload;
import org.datadog.jenkins.plugins.datadog.traces.write.TraceWriteStrategy;
import org.datadog.jenkins.plugins.datadog.traces.write.TraceWriteStrategyImpl;
import org.datadog.jenkins.plugins.datadog.traces.write.Track;
import org.datadog.jenkins.plugins.datadog.util.SuppressFBWarnings;
import org.datadog.jenkins.plugins.datadog.util.TagsUtil;

Expand Down Expand Up @@ -102,14 +103,13 @@ public class DatadogApiClient implements DatadogClient {
public static DatadogClient getInstance(String url, String logIntakeUrl, String webhookIntakeUrl, Secret apiKey){
// If the configuration has not changed, return the current instance without validation
// since we've already validated and/or errored about the data

DatadogApiClient newInstance = new DatadogApiClient(url, logIntakeUrl, webhookIntakeUrl, apiKey);
if (instance != null && instance.equals(newInstance)) {
if (instance != null && !configurationChanged(url, logIntakeUrl, webhookIntakeUrl, apiKey)) {
if (DatadogApiClient.failedLastValidation) {
return null;
}
return instance;
}
DatadogApiClient newInstance = new DatadogApiClient(url, logIntakeUrl, webhookIntakeUrl, apiKey);
if (enableValidations) {
synchronized (DatadogApiClient.class) {
DatadogApiClient.instance = newInstance;
Expand All @@ -126,6 +126,13 @@ public static DatadogClient getInstance(String url, String logIntakeUrl, String
return newInstance;
}

private static boolean configurationChanged(String url, String logIntakeUrl, String webhookIntakeUrl, Secret apiKey){
return !Objects.equal(instance.getUrl(), url) ||
!Objects.equal(instance.getLogIntakeUrl(), logIntakeUrl) ||
!Objects.equal(instance.getWebhookIntakeUrl(), webhookIntakeUrl) ||
!Objects.equal(instance.getApiKey(), apiKey);
}

private DatadogApiClient(String url, String logIntakeUrl, String webhookIntakeUrl, Secret apiKey) {
this.url = url;
this.apiKey = apiKey;
Expand Down Expand Up @@ -485,24 +492,10 @@ private boolean validateWebhookIntakeConnection() throws IOException {

@Override
public TraceWriteStrategy createTraceWriteStrategy() {
return new TraceWriteStrategyImpl(new DatadogWebhookBuildLogic(), new DatadogWebhookPipelineLogic(), this::sendSpans);
return new TraceWriteStrategyImpl(Track.WEBHOOK, this::sendSpans);
}

private void sendSpans(List<net.sf.json.JSONObject> spans) {
for (JSONObject span : spans) {
// webhook intake does not support batch requests
postWebhook(span.toString());
}
}

/**
* Posts a given payload to the Datadog Webhook Intake, using the user configured apiKey.
*
* @param payload - A webhook payload.
*/
private void postWebhook(String payload) {
logger.fine("Sending webhook");

private void sendSpans(Collection<Payload> spans) {
if (this.webhookIntakeConnectionBroken) {
throw new RuntimeException("Your client is not initialized properly; webhook intake connection is broken.");
}
Expand All @@ -515,7 +508,17 @@ private void postWebhook(String payload) {
headers.put("DD-API-KEY", Secret.toString(apiKey));
headers.put("DD-CI-PROVIDER-NAME", "jenkins");

byte[] body = payload.getBytes(StandardCharsets.UTF_8);
httpClient.postAsynchronously(url, headers, "application/json", body);
for (Payload span : spans) {
if (span.getTrack() != Track.WEBHOOK) {
logger.severe("Expected webhook track, got " + span.getTrack() + ", dropping span");
continue;
}

byte[] body = span.getJson().toString().getBytes(StandardCharsets.UTF_8);

// webhook intake does not support batch requests
logger.fine("Sending webhook");
httpClient.postAsynchronously(url, headers, "application/json", body);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@

import hudson.model.Run;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.logging.Logger;
import javax.annotation.Nonnull;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import net.sf.json.JSONObject;
import org.datadog.jenkins.plugins.datadog.clients.DatadogAgentClient;
import org.datadog.jenkins.plugins.datadog.model.BuildData;
import org.datadog.jenkins.plugins.datadog.model.BuildPipelineNode;
Expand All @@ -29,6 +30,14 @@ public class AgentTraceWriteStrategy implements TraceWriteStrategy {
private final TraceWriteStrategy evpProxyStrategy;
private final TraceWriteStrategy apmStrategy;
private final Supplier<Boolean> checkEvpProxySupport;
/**
* Whether the Agent supports EVP Proxy.
* <p>
* This value may change from {@code false} to {@code true} if the Agent that this Jenkins talks to gets updated
* (the Agent's support for EVP proxy is checked periodically).
* <p>
* We don't handle agent downgrades, so {@code true} to {@code false} change is not possible.
*/
private volatile boolean evpProxySupported = false;
private volatile long lastEvpProxyCheckTimeMs = 0L;

Expand All @@ -40,33 +49,33 @@ public AgentTraceWriteStrategy(TraceWriteStrategy evpProxyStrategy, TraceWriteSt

@Nullable
@Override
public JSONObject serialize(BuildData buildData, Run<?, ?> run) {
public Payload serialize(BuildData buildData, Run<?, ?> run) {
return getCurrentStrategy().serialize(buildData, run);
}

@Nonnull
@Nullable
@Override
public JSONObject serialize(BuildPipelineNode node, Run<?, ?> run) throws IOException, InterruptedException {
public Payload serialize(BuildPipelineNode node, Run<?, ?> run) throws IOException, InterruptedException {
return getCurrentStrategy().serialize(node, run);
}

@Override
public void send(List<JSONObject> spans) {
// we have to check serialized spans to know where to send them,
public void send(Collection<Payload> spans) {
// we have to check the track for every span,
// because the serialization strategy might've changed in between serialize() and send()
if (isWebhook(spans)) {
evpProxyStrategy.send(spans);
} else {
apmStrategy.send(spans);
}
}
Map<Track, List<Payload>> spansByTrack = spans.stream().collect(Collectors.groupingBy(Payload::getTrack));
for (Map.Entry<Track, List<Payload>> e : spansByTrack.entrySet()) {
Track track = e.getKey();
List<Payload> trackSpans = e.getValue();

private boolean isWebhook(List<JSONObject> spans) {
if (spans.isEmpty()) {
return false;
if (track == Track.WEBHOOK) {
evpProxyStrategy.send(trackSpans);
} else if (track == Track.APM) {
apmStrategy.send(trackSpans);
} else {
throw new IllegalArgumentException("Unexpected track value: " + track);
}
}
JSONObject span = spans.iterator().next();
return span.get("level") != null;
}

private TraceWriteStrategy getCurrentStrategy() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package org.datadog.jenkins.plugins.datadog.traces.write;

import javax.annotation.Nonnull;
import net.sf.json.JSONObject;

public class Payload {

private final JSONObject json;
private final Track track;

public Payload(@Nonnull JSONObject json, @Nonnull Track track) {
this.json = json;
this.track = track;
}

@Nonnull
public JSONObject getJson() {
return json;
}

@Nonnull
public Track getTrack() {
return track;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,17 @@

import hudson.model.Run;
import java.io.IOException;
import java.util.List;
import javax.annotation.Nonnull;
import java.util.Collection;
import javax.annotation.Nullable;
import net.sf.json.JSONObject;
import org.datadog.jenkins.plugins.datadog.model.BuildData;
import org.datadog.jenkins.plugins.datadog.model.BuildPipelineNode;

public interface TraceWriteStrategy {
@Nullable
JSONObject serialize(BuildData buildData, Run<?, ?> run);
Payload serialize(BuildData buildData, Run<?, ?> run);

@Nonnull
JSONObject serialize(BuildPipelineNode node, Run<?, ?> run) throws IOException, InterruptedException;
@Nullable
Payload serialize(BuildPipelineNode node, Run<?, ?> run) throws IOException, InterruptedException;

void send(List<JSONObject> spans);
void send(Collection<Payload> spans);
}
Loading

0 comments on commit d18d872

Please sign in to comment.