Skip to content

Commit

Permalink
Renamed some classes and methods
Browse files Browse the repository at this point in the history
  • Loading branch information
nikita-tkachenko-datadog committed Jan 26, 2024
1 parent eeeb4c2 commit 1756c79
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ of this software and associated documentation files (the "Software"), to deal
import org.datadog.jenkins.plugins.datadog.DatadogUtilities;
import org.datadog.jenkins.plugins.datadog.traces.mapper.JsonTraceSpanMapper;
import org.datadog.jenkins.plugins.datadog.traces.write.AgentTraceWriteStrategy;
import org.datadog.jenkins.plugins.datadog.traces.write.Span;
import org.datadog.jenkins.plugins.datadog.traces.write.Payload;
import org.datadog.jenkins.plugins.datadog.traces.write.TraceWriteStrategy;
import org.datadog.jenkins.plugins.datadog.traces.write.TraceWriteStrategyImpl;
import org.datadog.jenkins.plugins.datadog.traces.write.Track;
Expand Down Expand Up @@ -536,7 +536,7 @@ boolean isEvpProxySupported() {
/**
* Posts a given payload to the Agent EVP Proxy, so it is forwarded to the Webhook Intake.
*/
private void sendSpansToWebhook(Collection<Span> spans) {
private void sendSpansToWebhook(Collection<Payload> spans) {
DatadogGlobalConfiguration datadogGlobalDescriptor = DatadogUtilities.getDatadogGlobalDescriptor();
String urlParameters = datadogGlobalDescriptor != null ? "?service=" + datadogGlobalDescriptor.getCiInstanceName() : "";
String url = String.format("http://%s:%d/evp_proxy/v1/api/v2/webhook/%s", hostname, traceCollectionPort, urlParameters);
Expand All @@ -545,29 +545,29 @@ private void sendSpansToWebhook(Collection<Span> spans) {
headers.put("X-Datadog-EVP-Subdomain", "webhook-intake");
headers.put("DD-CI-PROVIDER-NAME", "jenkins");

for (Span span : spans) {
for (Payload span : spans) {
if (span.getTrack() != Track.WEBHOOK) {
logger.severe("Expected webhook track, got " + span.getTrack() + ", dropping span");
continue;
}

byte[] body = span.getPayload().toString().getBytes(StandardCharsets.UTF_8);
byte[] body = span.getJson().toString().getBytes(StandardCharsets.UTF_8);

// webhook intake does not support batch requests
logger.fine("Sending webhook");
client.postAsynchronously(url, headers, "application/json", body);
}
}

private void sendSpansToApm(Collection<Span> spans) {
private void sendSpansToApm(Collection<Payload> spans) {
try {
Map<String, net.sf.json.JSONArray> tracesById = new HashMap<>();
for (Span span : spans) {
for (Payload span : spans) {
if (span.getTrack() != Track.APM) {
logger.severe("Expected APM track, got " + span.getTrack() + ", dropping span");
continue;
}
tracesById.computeIfAbsent(span.getPayload().getString(JsonTraceSpanMapper.TRACE_ID), k -> new net.sf.json.JSONArray()).add(span.getPayload());
tracesById.computeIfAbsent(span.getJson().getString(JsonTraceSpanMapper.TRACE_ID), k -> new net.sf.json.JSONArray()).add(span.getJson());
}

final JSONArray jsonTraces = new JSONArray();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ of this software and associated documentation files (the "Software"), to deal
import org.datadog.jenkins.plugins.datadog.DatadogEvent;
import org.datadog.jenkins.plugins.datadog.DatadogGlobalConfiguration;
import org.datadog.jenkins.plugins.datadog.DatadogUtilities;
import org.datadog.jenkins.plugins.datadog.traces.write.Span;
import org.datadog.jenkins.plugins.datadog.traces.write.Payload;
import org.datadog.jenkins.plugins.datadog.traces.write.TraceWriteStrategy;
import org.datadog.jenkins.plugins.datadog.traces.write.TraceWriteStrategyImpl;
import org.datadog.jenkins.plugins.datadog.traces.write.Track;
Expand Down Expand Up @@ -495,7 +495,7 @@ public TraceWriteStrategy createTraceWriteStrategy() {
return new TraceWriteStrategyImpl(Track.WEBHOOK, this::sendSpans);
}

private void sendSpans(Collection<Span> spans) {
private void sendSpans(Collection<Payload> spans) {
if (this.webhookIntakeConnectionBroken) {
throw new RuntimeException("Your client is not initialized properly; webhook intake connection is broken.");
}
Expand All @@ -508,13 +508,13 @@ private void sendSpans(Collection<Span> spans) {
headers.put("DD-API-KEY", Secret.toString(apiKey));
headers.put("DD-CI-PROVIDER-NAME", "jenkins");

for (Span span : spans) {
for (Payload span : spans) {
if (span.getTrack() != Track.WEBHOOK) {
logger.severe("Expected webhook track, got " + span.getTrack() + ", dropping span");
continue;
}

byte[] body = span.getPayload().toString().getBytes(StandardCharsets.UTF_8);
byte[] body = span.getJson().toString().getBytes(StandardCharsets.UTF_8);

// webhook intake does not support batch requests
logger.fine("Sending webhook");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,24 +47,24 @@ public AgentTraceWriteStrategy(TraceWriteStrategy evpProxyStrategy, TraceWriteSt
}

@Override
public Span createSpan(BuildData buildData, Run<?, ?> run) {
return getCurrentStrategy().createSpan(buildData, run);
public Payload serialize(BuildData buildData, Run<?, ?> run) {
return getCurrentStrategy().serialize(buildData, run);
}

@Nonnull
@Override
public Collection<Span> createSpan(FlowNode flowNode, Run<?, ?> run) {
return getCurrentStrategy().createSpan(flowNode, run);
public Collection<Payload> serialize(FlowNode flowNode, Run<?, ?> run) {
return getCurrentStrategy().serialize(flowNode, run);
}

@Override
public void send(Collection<Span> spans) {
public void send(Collection<Payload> spans) {
// we have to check the track for every span,
// because the serialization strategy might've changed in between serialize() and send()
Map<Track, List<Span>> spansByTrack = spans.stream().collect(Collectors.groupingBy(Span::getTrack));
for (Map.Entry<Track, List<Span>> e : spansByTrack.entrySet()) {
Map<Track, List<Payload>> spansByTrack = spans.stream().collect(Collectors.groupingBy(Payload::getTrack));
for (Map.Entry<Track, List<Payload>> e : spansByTrack.entrySet()) {
Track track = e.getKey();
List<Span> trackSpans = e.getValue();
List<Payload> trackSpans = e.getValue();

if (track == Track.WEBHOOK) {
evpProxyStrategy.send(trackSpans);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,19 @@
import javax.annotation.Nonnull;
import net.sf.json.JSONObject;

public class Span {
public class Payload {

private final JSONObject payload;
private final JSONObject json;
private final Track track;

public Span(@Nonnull JSONObject payload, @Nonnull Track track) {
this.payload = payload;
public Payload(@Nonnull JSONObject json, @Nonnull Track track) {
this.json = json;
this.track = track;
}

@Nonnull
public JSONObject getPayload() {
return payload;
public JSONObject getJson() {
return json;
}

@Nonnull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@

public interface TraceWriteStrategy {
@Nullable
Span createSpan(BuildData buildData, Run<?, ?> run);
Payload serialize(BuildData buildData, Run<?, ?> run);

@Nonnull
Collection<Span> createSpan(FlowNode flowNode, Run<?, ?> run);
Collection<Payload> serialize(FlowNode flowNode, Run<?, ?> run);

void send(Collection<Span> spans);
void send(Collection<Payload> spans);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import hudson.model.Run;
import java.util.Collection;
import java.util.Collections;
import java.util.function.Consumer;
import java.util.logging.Logger;
import java.util.stream.Collectors;
Expand All @@ -26,9 +25,9 @@ public class TraceWriteStrategyImpl implements TraceWriteStrategy {
private final Track track;
private final DatadogBaseBuildLogic buildLogic;
private final DatadogBasePipelineLogic pipelineLogic;
private final CircuitBreaker<Collection<Span>> sendSpansCircuitBreaker;
private final CircuitBreaker<Collection<Payload>> sendSpansCircuitBreaker;

public TraceWriteStrategyImpl(Track track, Consumer<Collection<Span>> spansSender) {
public TraceWriteStrategyImpl(Track track, Consumer<Collection<Payload>> spansSender) {
if (track == Track.APM) {
this.buildLogic = new DatadogTraceBuildLogic();
this.pipelineLogic = new DatadogTracePipelineLogic();
Expand All @@ -47,24 +46,24 @@ public TraceWriteStrategyImpl(Track track, Consumer<Collection<Span>> spansSende
}

@Override
public Span createSpan(final BuildData buildData, final Run<?, ?> run) {
public Payload serialize(final BuildData buildData, final Run<?, ?> run) {
JSONObject buildSpan = buildLogic.finishBuildTrace(buildData, run);
return buildSpan != null ? new Span(buildSpan, track) : null;
return buildSpan != null ? new Payload(buildSpan, track) : null;
}

@Nonnull
@Override
public Collection<Span> createSpan(FlowNode flowNode, Run<?, ?> run) {
public Collection<Payload> serialize(FlowNode flowNode, Run<?, ?> run) {
Collection<JSONObject> stepSpans = pipelineLogic.execute(flowNode, run);
return stepSpans.stream().map(payload -> new Span(payload, track)).collect(Collectors.toList());
return stepSpans.stream().map(payload -> new Payload(payload, track)).collect(Collectors.toList());
}

@Override
public void send(Collection<Span> serializationResult) {
public void send(Collection<Payload> serializationResult) {
sendSpansCircuitBreaker.accept(serializationResult);
}

private void logTransportBroken(Collection<Span> spans) {
private void logTransportBroken(Collection<Payload> spans) {
logger.fine("Ignoring " + spans.size() + " because transport is broken");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public final class TraceWriter {
private static final int DEFAULT_BATCH_SIZE_LIMIT = 100;

private final TraceWriteStrategy traceWriteStrategy;
private final BlockingQueue<Span> queue;
private final BlockingQueue<Payload> queue;
private final Thread poller;

public TraceWriter(DatadogClient datadogClient) {
Expand All @@ -54,18 +54,18 @@ public void stopSynchronously() throws InterruptedException {
}

public void submitBuild(final BuildData buildData, final Run<?,?> run) throws InterruptedException, TimeoutException {
Span span = traceWriteStrategy.createSpan(buildData, run);
Payload span = traceWriteStrategy.serialize(buildData, run);
submit(span);
}

public void submitPipelineStep(FlowNode flowNode, Run<?, ?> run) throws InterruptedException, TimeoutException {
Collection<Span> spans = traceWriteStrategy.createSpan(flowNode, run);
for (Span span : spans) {
Collection<Payload> spans = traceWriteStrategy.serialize(flowNode, run);
for (Payload span : spans) {
submit(span);
}
}

private void submit(@Nullable Span span) throws InterruptedException, TimeoutException {
private void submit(@Nullable Payload span) throws InterruptedException, TimeoutException {
if (span != null && !queue.offer(span, getEnv(SUBMIT_TIMEOUT_ENV_VAR, DEFAULT_SUBMIT_TIMEOUT_SECONDS), TimeUnit.SECONDS)) {
throw new TimeoutException("Timed out while submitting span");
}
Expand All @@ -75,14 +75,14 @@ private void runPollingLoop() {
long stopPollingAt = Long.MAX_VALUE;
while (System.currentTimeMillis() < stopPollingAt) {
try {
Span span = queue.poll(getEnv(POLLING_TIMEOUT_ENV_VAR, DEFAULT_POLLING_TIMEOUT_SECONDS), TimeUnit.SECONDS);
Payload span = queue.poll(getEnv(POLLING_TIMEOUT_ENV_VAR, DEFAULT_POLLING_TIMEOUT_SECONDS), TimeUnit.SECONDS);
if (span == null) {
// nothing to send
continue;
}

int batchSize = getEnv(BATCH_SIZE_LIMIT_ENV_VAR, DEFAULT_BATCH_SIZE_LIMIT);
List<Span> spans = new ArrayList<>(batchSize);
List<Payload> spans = new ArrayList<>(batchSize);
spans.add(span);
queue.drainTo(spans, batchSize - 1);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
package org.datadog.jenkins.plugins.datadog.traces.write;

public enum Track {APM, WEBHOOK}
public enum Track {
APM, WEBHOOK
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ of this software and associated documentation files (the "Software"), to deal
import org.datadog.jenkins.plugins.datadog.traces.DatadogWebhookPipelineLogic;
import org.datadog.jenkins.plugins.datadog.traces.mapper.JsonTraceSpanMapper;
import org.datadog.jenkins.plugins.datadog.traces.message.TraceSpan;
import org.datadog.jenkins.plugins.datadog.traces.write.Span;
import org.datadog.jenkins.plugins.datadog.traces.write.Payload;
import org.datadog.jenkins.plugins.datadog.traces.write.TraceWriteStrategy;
import org.datadog.jenkins.plugins.datadog.traces.write.Track;
import org.jenkinsci.plugins.workflow.graph.FlowNode;
Expand Down Expand Up @@ -268,43 +268,43 @@ private static final class StubTraceWriteStrategy implements TraceWriteStrategy
private final Collection<JSONObject> webhooks = new LinkedBlockingQueue<>();

@Override
public Span createSpan(BuildData buildData, Run<?, ?> run) {
public Payload serialize(BuildData buildData, Run<?, ?> run) {
if (isWebhook) {
JSONObject json = new DatadogWebhookBuildLogic().finishBuildTrace(buildData, run);
if (json == null) {
return null;
}
webhooks.add(json);
return new Span(json, Track.WEBHOOK);
return new Payload(json, Track.WEBHOOK);
} else {
TraceSpan span = new DatadogTraceBuildLogic().createSpan(buildData, run);
if (span == null) {
return null;
}
traces.add(span);
JSONObject json = new JsonTraceSpanMapper().map(span);
return new Span(json, Track.APM);
return new Payload(json, Track.APM);
}
}

@Nonnull
@Override
public Collection<Span> createSpan(FlowNode flowNode, Run<?, ?> run) {
public Collection<Payload> serialize(FlowNode flowNode, Run<?, ?> run) {
if (isWebhook) {
Collection<JSONObject> jsons = new DatadogWebhookPipelineLogic().execute(flowNode, run);
webhooks.addAll(jsons);
return jsons.stream().map(payload -> new Span(payload, Track.WEBHOOK)).collect(Collectors.toList());
return jsons.stream().map(payload -> new Payload(payload, Track.WEBHOOK)).collect(Collectors.toList());
} else {
Collection<TraceSpan> traceSpans = new DatadogTracePipelineLogic().collectTraces(flowNode, run);
traces.addAll(traceSpans);
JsonTraceSpanMapper mapper = new JsonTraceSpanMapper();
List<JSONObject> jsons = traceSpans.stream().map(mapper::map).collect(Collectors.toList());
return jsons.stream().map(payload -> new Span(payload, Track.APM)).collect(Collectors.toList());
return jsons.stream().map(payload -> new Payload(payload, Track.APM)).collect(Collectors.toList());
}
}

@Override
public void send(Collection<Span> spans) {
public void send(Collection<Payload> spans) {
// no op
}

Expand Down

0 comments on commit 1756c79

Please sign in to comment.