Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
nikita-tkachenko-datadog committed Nov 19, 2024
1 parent 6c8f98b commit 4a5c1a1
Show file tree
Hide file tree
Showing 21 changed files with 196 additions and 55 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.datadog.jenkins.plugins.datadog.clients;

import net.sf.json.JSONObject;
import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
Expand All @@ -20,18 +21,18 @@ public class CompressedBatchSender<T> implements JsonPayloadSender<T> {
private final String url;
private final Map<String, String> headers;
private final int batchLimitBytes;
private final Function<T, String> payloadToString;
private final Function<T, JSONObject> payloadToJson;

public CompressedBatchSender(HttpClient httpClient,
String url,
Map<String, String> headers,
int batchLimitBytes,
Function<T, String> payloadToString) {
Function<T, JSONObject> payloadToJson) {
this.httpClient = httpClient;
this.url = url;
this.headers = headers;
this.batchLimitBytes = batchLimitBytes;
this.payloadToString = payloadToString;
this.payloadToJson = payloadToJson;
}

@Override
Expand All @@ -42,7 +43,8 @@ public void send(Collection<T> payloads) throws Exception {
int uncompressedRequestLength = 0;

for (T payload : payloads) {
byte[] body = payloadToString.apply(payload).getBytes(StandardCharsets.UTF_8);
JSONObject json = payloadToJson.apply(payload);
byte[] body = json.toString().getBytes(StandardCharsets.UTF_8);
if (body.length + 2 > batchLimitBytes) { // + 2 is for array beginning and end: [<payload>]
logger.severe("Dropping a payload because size (" + body.length + ") exceeds the allowed limit of " + batchLimitBytes);
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ private static final class AgentLogWriteStrategy implements LogWriteStrategy {
private final String host;
private final int port;

private final CircuitBreaker<List<String>> circuitBreaker;
private final CircuitBreaker<List<net.sf.json.JSONObject>> circuitBreaker;

private Socket socket;
private OutputStream out;
Expand All @@ -320,17 +320,17 @@ private AgentLogWriteStrategy(String host, int port) {
CircuitBreaker.DEFAULT_DELAY_FACTOR);
}

public void send(List<String> payloads) {
public void send(List<net.sf.json.JSONObject> payloads) {
circuitBreaker.accept(payloads);
}

private void doSend(List<String> payloads) throws Exception {
private void doSend(List<net.sf.json.JSONObject> payloads) throws Exception {
if (socket == null || socket.isClosed() || !socket.isConnected()) {
socket = new Socket(host, port);
out = new BufferedOutputStream(socket.getOutputStream());
}
for (String payload : payloads) {
out.write(payload.getBytes(StandardCharsets.UTF_8));
for (net.sf.json.JSONObject payload : payloads) {
out.write(payload.toString().getBytes(StandardCharsets.UTF_8));
out.write(LINE_SEPARATOR);
}
}
Expand All @@ -340,7 +340,7 @@ private void handleError(Exception e) {
DatadogUtilities.severe(logger, e, "Could not write logs to agent");
}

private void fallback(List<String> payloads) {
private void fallback(List<net.sf.json.JSONObject> payloads) {
// cannot establish connection to agent, do nothing
}

Expand Down Expand Up @@ -381,12 +381,12 @@ public TraceWriteStrategy createTraceWriteStrategy() {
"X-Datadog-EVP-Subdomain", "webhook-intake",
"DD-CI-PROVIDER-NAME", "jenkins",
"Content-Encoding", "gzip");
payloadSender = new CompressedBatchSender<>(client, url, headers, PAYLOAD_SIZE_LIMIT, p -> p.getJson().toString());
payloadSender = new CompressedBatchSender<>(client, url, headers, PAYLOAD_SIZE_LIMIT, p -> p.getJson());
} else {
Map<String, String> headers = Map.of(
"X-Datadog-EVP-Subdomain", "webhook-intake",
"DD-CI-PROVIDER-NAME", "jenkins");
payloadSender = new SimpleSender<>(client, url, headers, p -> p.getJson().toString());
payloadSender = new SimpleSender<>(client, url, headers, p -> p.getJson());
}

TraceWriteStrategyImpl evpStrategy = new TraceWriteStrategyImpl(Track.WEBHOOK, payloadSender::send);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,24 +157,17 @@ public static boolean validateWebhookIntakeConnection(String webhookIntakeUrl, S
}

public static boolean validateLogIntakeConnection(String logsIntakeUrl, Secret apiKey) {
String payload = "{\"message\":\"[datadog-plugin] Check connection\", " +
"\"ddsource\":\"Jenkins\", \"service\":\"Jenkins\", " +
"\"hostname\":\"" + DatadogUtilities.getHostname(null) + "\"}";
return postLogs(new HttpClient(HTTP_TIMEOUT_MS), logsIntakeUrl, apiKey, payload);
}

private static boolean postLogs(HttpClient httpClient, String logIntakeUrl, Secret apiKey, String payload) {
if(payload == null){
logger.fine("No payload to post");
return true;
}
HttpClient httpClient = new HttpClient(HTTP_TIMEOUT_MS);

Map<String, String> headers = new HashMap<>();
headers.put("DD-API-KEY", Secret.toString(apiKey));

String payload = "{\"message\":\"[datadog-plugin] Check connection\", " +
"\"ddsource\":\"Jenkins\", \"service\":\"Jenkins\", " +
"\"hostname\":\"" + DatadogUtilities.getHostname(null) + "\"}";
byte[] body = payload.getBytes(StandardCharsets.UTF_8);
try {
httpClient.post(logIntakeUrl, headers, "application/json", body, Function.identity());
httpClient.post(logsIntakeUrl, headers, "application/json", body, Function.identity());
return true;
} catch (Exception e) {
DatadogUtilities.severe(logger, e, "Failed to post logs");
Expand Down Expand Up @@ -323,13 +316,13 @@ public LogWriteStrategy createLogWriteStrategy() {
}

private static final class ApiLogWriteStrategy implements LogWriteStrategy {
private final CircuitBreaker<List<String>> circuitBreaker;
private final CircuitBreaker<List<JSONObject>> circuitBreaker;

public ApiLogWriteStrategy(String logIntakeUrl, Secret apiKey, HttpClient httpClient) {
Map<String, String> headers = Map.of(
"DD-API-KEY", Secret.toString(apiKey),
"Content-Encoding", "gzip");
JsonPayloadSender<String> payloadSender = new CompressedBatchSender<>(
JsonPayloadSender<JSONObject> payloadSender = new CompressedBatchSender<>(
httpClient,
logIntakeUrl,
headers,
Expand All @@ -346,15 +339,15 @@ public ApiLogWriteStrategy(String logIntakeUrl, Secret apiKey, HttpClient httpCl
}

@Override
public void send(List<String> logs) {
public void send(List<JSONObject> logs) {
circuitBreaker.accept(logs);
}

private void handleError(Exception e) {
DatadogUtilities.severe(logger, e, "Failed to post logs");
}

private void fallback(List<String> payloads) {
private void fallback(List<JSONObject> payloads) {
// cannot establish connection to API, do nothing
}

Expand All @@ -377,12 +370,12 @@ public TraceWriteStrategy createTraceWriteStrategy() {
"DD-API-KEY", Secret.toString(apiKey),
"DD-CI-PROVIDER-NAME", "jenkins",
"Content-Encoding", "gzip");
payloadSender = new CompressedBatchSender<>(httpClient, url, headers, PAYLOAD_SIZE_LIMIT, p -> p.getJson().toString());
payloadSender = new CompressedBatchSender<>(httpClient, url, headers, PAYLOAD_SIZE_LIMIT, p -> p.getJson());
} else {
Map<String, String> headers = Map.of(
"DD-API-KEY", Secret.toString(apiKey),
"DD-CI-PROVIDER-NAME", "jenkins");
payloadSender = new SimpleSender<>(httpClient, url, headers, p -> p.getJson().toString());
payloadSender = new SimpleSender<>(httpClient, url, headers, p -> p.getJson());
}

return new TraceWriteStrategyImpl(Track.WEBHOOK, payloadSender::send);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.datadog.jenkins.plugins.datadog.clients;

import java.util.Collection;
import java.util.function.Function;

public interface JsonPayloadSender<T> {
void send(Collection<T> payloads) throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,30 @@
import java.util.Collection;
import java.util.Map;
import java.util.function.Function;
import net.sf.json.JSONObject;

public class SimpleSender<T> implements JsonPayloadSender<T> {

private final HttpClient httpClient;
private final String url;
private final Map<String, String> headers;
private final Function<T, String> payloadToString;
private final Function<T, JSONObject> payloadToJson;

public SimpleSender(HttpClient httpClient,
String url,
Map<String, String> headers,
Function<T, String> payloadToString) {
Function<T, JSONObject> payloadToJson) {
this.httpClient = httpClient;
this.url = url;
this.headers = headers;
this.payloadToString = payloadToString;
this.payloadToJson = payloadToJson;
}

@Override
public void send(Collection<T> payloads) throws Exception {
for (T payload : payloads) {
byte[] body = payloadToString.apply(payload).getBytes(StandardCharsets.UTF_8);
JSONObject json = payloadToJson.apply(payload);
byte[] body = json.toString().getBytes(StandardCharsets.UTF_8);
httpClient.postAsynchronously(url, headers, "application/json", body);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class ConnectivityChecksFlare implements FlareContributor {

@Override
public int order() {
return 2;
return ORDER.CONNECTIVITY_CHECKS;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class DatadogConfigFlare implements FlareContributor {

@Override
public int order() {
return 1;
return ORDER.CONFIG;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public class DatadogEnvVarsFlare implements FlareContributor {

@Override
public int order() {
return 3;
return ORDER.ENV_VARS;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public class ExceptionsFlare implements FlareContributor {

@Override
public int order() {
return 4;
return ORDER.EXCEPTIONS;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,19 @@

public interface FlareContributor {

interface ORDER {
int RUNTIME_INFO = 0;
int CONFIG = 1;
int CONNECTIVITY_CHECKS = 2;
int ENV_VARS = 3;
int WRITERS_HEALTH = 4;
int EXCEPTIONS = 5;
int PLUGIN_LOGS = 6;
int JENKINS_LOGS = 7;
int THREAD_DUMP = 8;
int JFR = 9;
}

default boolean isEnabledByDefault() {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class JenkinsLogsFlare implements FlareContributor {

@Override
public int order() {
return 6;
return ORDER.JENKINS_LOGS;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public boolean isEnabledByDefault() {

@Override
public int order() {
return 8;
return ORDER.JFR;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void onStartup() {

@Override
public int order() {
return 5;
return ORDER.PLUGIN_LOGS;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public class RuntimeInfoFlare implements FlareContributor {

@Override
public int order() {
return 0;
return ORDER.RUNTIME_INFO;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public class ThreadDumpFlare implements FlareContributor {

@Override
public int order() {
return 7;
return ORDER.THREAD_DUMP;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public WritersHealthFlare() {

@Override
public int order() {
return 0;
return ORDER.WRITERS_HEALTH;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ public void write(String line) {
payload.put("timestamp", System.currentTimeMillis());
payload.put(PipelineStepData.StepType.PIPELINE.getTagName() + CITags._NAME, this.buildData.getJobName());

AsyncWriter<String> logWriter = LogWriterFactory.getLogWriter();
AsyncWriter<JSONObject> logWriter = LogWriterFactory.getLogWriter();
if (logWriter != null) {
logWriter.submit(payload.toString());
logWriter.submit(payload);
}

} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package org.datadog.jenkins.plugins.datadog.logs;

import net.sf.json.JSONObject;
import java.util.List;

public interface LogWriteStrategy {

LogWriteStrategy NO_OP = new LogWriteStrategy() {
@Override
public void send(List<String> logs) {
public void send(List<JSONObject> logs) {
// no op
}

Expand All @@ -16,6 +17,6 @@ public void close() {
}
};

void send(List<String> logs);
void send(List<JSONObject> logs);
void close();
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.datadog.jenkins.plugins.datadog.logs;

import hudson.init.Terminator;
import net.sf.json.JSONObject;
import org.datadog.jenkins.plugins.datadog.util.AsyncWriter;
import org.datadog.jenkins.plugins.datadog.DatadogClient;
import org.datadog.jenkins.plugins.datadog.DatadogUtilities;
Expand All @@ -21,7 +22,7 @@ public class LogWriterFactory {
private static final int DEFAULT_POLLING_TIMEOUT_SECONDS = 2;
private static final int DEFAULT_BATCH_SIZE_LIMIT = 500;

private static volatile AsyncWriter<String> LOG_WRITER;
private static volatile AsyncWriter<JSONObject> LOG_WRITER;

public static synchronized void onDatadogClientUpdate(@Nullable DatadogClient client) {
if (client == null) {
Expand Down Expand Up @@ -57,7 +58,7 @@ public static synchronized void stop() throws InterruptedException {
}

@Nullable
public static AsyncWriter<String> getLogWriter() {
public static AsyncWriter<JSONObject> getLogWriter() {
return LOG_WRITER;
}
}
Loading

0 comments on commit 4a5c1a1

Please sign in to comment.