Skip to content

Commit

Permalink
[controller] Log request and response body for aggregated stoppable c…
Browse files Browse the repository at this point in the history
…heck (#1320)

Currently, controller logs audit logs for API requests and responses. These audit logs however, do not log the request body.
In the aggregated stoppable checks API endpoint, the request and response are JSON objects. This PR logs these objects to get visibility into them.
  • Loading branch information
nisargthakkar authored Nov 19, 2024
1 parent 92de3d3 commit 18f0d37
Show file tree
Hide file tree
Showing 4 changed files with 193 additions and 29 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.venice.controllerapi;

import java.util.Collections;
import java.util.List;
import java.util.Map;

Expand All @@ -9,6 +10,10 @@ public class StoppableNodeStatusResponse extends ControllerResponse {
private Map<String, String> nonStoppableInstancesWithReasons;

public Map<String, String> getNonStoppableInstancesWithReasons() {
if (nonStoppableInstancesWithReasons == null) {
return Collections.emptyMap();
}

return nonStoppableInstancesWithReasons;
}

Expand All @@ -17,10 +22,14 @@ public void setNonStoppableInstancesWithReason(Map<String, String> nonStoppableI
}

public List<String> getStoppableInstances() {
if (stoppableInstances == null) {
return Collections.emptyList();
}

return stoppableInstances;
}

public void setStoppableInstances(List<String> remoableInstances) {
this.stoppableInstances = remoableInstances;
public void setStoppableInstances(List<String> stoppableInstances) {
this.stoppableInstances = stoppableInstances;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.linkedin.venice.controllerapi;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.linkedin.venice.utils.ObjectMapperFactory;
import java.util.Arrays;
import java.util.HashMap;
import org.testng.annotations.Test;


public class TestStoppableNodeStatusResponse {
private static final ObjectMapper OBJECT_MAPPER = ObjectMapperFactory.getInstance();

@Test
public void testJsonSerialization() throws JsonProcessingException {
StoppableNodeStatusResponse response = new StoppableNodeStatusResponse();
response.setStoppableInstances(Arrays.asList("instance1", "instance2"));
response.setNonStoppableInstancesWithReason(new HashMap<String, String>() {
{
put("instance3", "reason1");
put("instance4", "reason2");
}
});

String serializedResponse = OBJECT_MAPPER.writeValueAsString(response);
JsonNode deserializedResponse = OBJECT_MAPPER.readTree(serializedResponse);

assertTrue(deserializedResponse.get("stoppableInstances") instanceof ArrayNode);
assertEquals(deserializedResponse.get("stoppableInstances").get(0).asText(), "instance1");
assertEquals(deserializedResponse.get("stoppableInstances").get(1).asText(), "instance2");
assertTrue(deserializedResponse.get("nonStoppableInstancesWithReasons") instanceof ObjectNode);
assertEquals(deserializedResponse.get("nonStoppableInstancesWithReasons").get("instance3").asText(), "reason1");
assertEquals(deserializedResponse.get("nonStoppableInstancesWithReasons").get("instance4").asText(), "reason2");
}

@Test
public void testJsonSerializationForEmptyContent() throws JsonProcessingException {
StoppableNodeStatusResponse response = new StoppableNodeStatusResponse();

String serializedResponse = OBJECT_MAPPER.writeValueAsString(response);
JsonNode deserializedResponse = OBJECT_MAPPER.readTree(serializedResponse);

assertTrue(deserializedResponse.get("stoppableInstances").isEmpty());
assertTrue(deserializedResponse.get("nonStoppableInstancesWithReasons").isEmpty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,14 @@
import java.util.List;
import java.util.Optional;
import org.apache.http.HttpStatus;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import spark.Request;
import spark.Route;


public class ControllerRoutes extends AbstractRoute {
private static final Logger LOGGER = LogManager.getLogger(ControllerRoutes.class);
private static final ObjectMapper OBJECT_MAPPER = ObjectMapperFactory.getInstance();

private final PubSubTopicRepository pubSubTopicRepository;
Expand Down Expand Up @@ -194,6 +197,7 @@ public Route getAggregatedHealthStatus(Admin admin) {
response.type(HttpConstants.JSON);

try {
LOGGER.info("[AggregatedHealthStatus] Received request: {}", request.body());
AggregatedHealthStatusRequest statusRequest =
OBJECT_MAPPER.readValue(request.body(), AggregatedHealthStatusRequest.class);
String cluster = statusRequest.getClusterId();
Expand All @@ -210,10 +214,10 @@ public Route getAggregatedHealthStatus(Admin admin) {

InstanceRemovableStatuses statuses =
admin.getAggregatedHealthStatus(cluster, instanceList, toBeStoppedInstanceList, isSslEnabled());
if (statuses.getRedirectUrl() != null) {
response.redirect(
statuses.getRedirectUrl() + AGGREGATED_HEALTH_STATUS.getPath(),
HttpStatus.SC_MOVED_TEMPORARILY);
String redirectUrl = statuses.getRedirectUrl();
if (redirectUrl != null) {
LOGGER.info("[AggregatedHealthStatus] Redirecting to: {}", redirectUrl);
response.redirect(redirectUrl + AGGREGATED_HEALTH_STATUS.getPath(), HttpStatus.SC_MOVED_TEMPORARILY);
return null;
} else {
responseObject.setNonStoppableInstancesWithReason(statuses.getNonStoppableInstancesWithReasons());
Expand All @@ -223,7 +227,9 @@ public Route getAggregatedHealthStatus(Admin admin) {
responseObject.setError(e);
AdminSparkServer.handleError(e, request, response);
}
return AdminSparkServer.OBJECT_MAPPER.writeValueAsString(responseObject);
String responseContent = AdminSparkServer.OBJECT_MAPPER.writeValueAsString(responseObject);
LOGGER.info("[AggregatedHealthStatus] Response: {}", responseContent);
return responseContent;
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,34 @@
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.venice.controller.Admin;
import com.linkedin.venice.controller.InstanceRemovableStatuses;
import com.linkedin.venice.controller.VeniceParentHelixAdmin;
import com.linkedin.venice.controllerapi.AggregatedHealthStatusRequest;
import com.linkedin.venice.controllerapi.ControllerApiConstants;
import com.linkedin.venice.controllerapi.LeaderControllerResponse;
import com.linkedin.venice.controllerapi.StoppableNodeStatusResponse;
import com.linkedin.venice.meta.Instance;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.utils.ObjectMapperFactory;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.testng.Assert;
import org.testng.annotations.Test;
import spark.Request;
import spark.Response;
import spark.Route;


public class ControllerRoutesTest {
private static final ObjectMapper OBJECT_MAPPER = ObjectMapperFactory.getInstance();
private static final String TEST_CLUSTER = "test_cluster";
private static final String TEST_NODE_ID = "l2181";
private static final String TEST_HOST = "localhost";
Expand All @@ -41,34 +52,120 @@ public void testGetLeaderController() throws Exception {

Route leaderControllerRoute =
new ControllerRoutes(false, Optional.empty(), pubSubTopicRepository).getLeaderController(mockAdmin);
LeaderControllerResponse leaderControllerResponse = ObjectMapperFactory.getInstance()
.readValue(
leaderControllerRoute.handle(request, mock(Response.class)).toString(),
LeaderControllerResponse.class);
Assert.assertEquals(leaderControllerResponse.getCluster(), TEST_CLUSTER);
Assert.assertEquals(leaderControllerResponse.getUrl(), "http://" + TEST_HOST + ":" + TEST_PORT);
Assert.assertEquals(leaderControllerResponse.getSecureUrl(), "https://" + TEST_HOST + ":" + TEST_SSL_PORT);
LeaderControllerResponse leaderControllerResponse = OBJECT_MAPPER.readValue(
leaderControllerRoute.handle(request, mock(Response.class)).toString(),
LeaderControllerResponse.class);
assertEquals(leaderControllerResponse.getCluster(), TEST_CLUSTER);
assertEquals(leaderControllerResponse.getUrl(), "http://" + TEST_HOST + ":" + TEST_PORT);
assertEquals(leaderControllerResponse.getSecureUrl(), "https://" + TEST_HOST + ":" + TEST_SSL_PORT);

Route leaderControllerSslRoute =
new ControllerRoutes(true, Optional.empty(), pubSubTopicRepository).getLeaderController(mockAdmin);
LeaderControllerResponse leaderControllerResponseSsl = ObjectMapperFactory.getInstance()
.readValue(
leaderControllerSslRoute.handle(request, mock(Response.class)).toString(),
LeaderControllerResponse.class);
Assert.assertEquals(leaderControllerResponseSsl.getCluster(), TEST_CLUSTER);
Assert.assertEquals(leaderControllerResponseSsl.getUrl(), "https://" + TEST_HOST + ":" + TEST_SSL_PORT);
Assert.assertEquals(leaderControllerResponseSsl.getSecureUrl(), "https://" + TEST_HOST + ":" + TEST_SSL_PORT);
LeaderControllerResponse leaderControllerResponseSsl = OBJECT_MAPPER.readValue(
leaderControllerSslRoute.handle(request, mock(Response.class)).toString(),
LeaderControllerResponse.class);
assertEquals(leaderControllerResponseSsl.getCluster(), TEST_CLUSTER);
assertEquals(leaderControllerResponseSsl.getUrl(), "https://" + TEST_HOST + ":" + TEST_SSL_PORT);
assertEquals(leaderControllerResponseSsl.getSecureUrl(), "https://" + TEST_HOST + ":" + TEST_SSL_PORT);

// Controller doesn't support SSL
Instance leaderNonSslController = new Instance(TEST_NODE_ID, TEST_HOST, TEST_PORT, TEST_PORT);
doReturn(leaderNonSslController).when(mockAdmin).getLeaderController(anyString());

LeaderControllerResponse leaderControllerNonSslResponse = ObjectMapperFactory.getInstance()
.readValue(
leaderControllerRoute.handle(request, mock(Response.class)).toString(),
LeaderControllerResponse.class);
Assert.assertEquals(leaderControllerNonSslResponse.getCluster(), TEST_CLUSTER);
Assert.assertEquals(leaderControllerNonSslResponse.getUrl(), "http://" + TEST_HOST + ":" + TEST_PORT);
Assert.assertEquals(leaderControllerNonSslResponse.getSecureUrl(), null);
LeaderControllerResponse leaderControllerNonSslResponse = OBJECT_MAPPER.readValue(
leaderControllerRoute.handle(request, mock(Response.class)).toString(),
LeaderControllerResponse.class);
assertEquals(leaderControllerNonSslResponse.getCluster(), TEST_CLUSTER);
assertEquals(leaderControllerNonSslResponse.getUrl(), "http://" + TEST_HOST + ":" + TEST_PORT);
assertEquals(leaderControllerNonSslResponse.getSecureUrl(), null);
}

@Test
public void testGetAggregatedHealthStatus() throws Exception {
ControllerRoutes controllerRoutes = new ControllerRoutes(false, Optional.empty(), pubSubTopicRepository);
Admin mockAdmin = mock(VeniceParentHelixAdmin.class);

List<String> instanceList = Arrays.asList("instance1_5000", "instance2_5000");
List<String> toBeStoppedInstanceList = Arrays.asList("instance3_5000", "instance4_5000");

AggregatedHealthStatusRequest requestBody =
new AggregatedHealthStatusRequest(TEST_CLUSTER, instanceList, toBeStoppedInstanceList);
String body = OBJECT_MAPPER.writeValueAsString(requestBody);

Request request = mock(Request.class);
doReturn(body).when(request).body();

// Test redirect
InstanceRemovableStatuses redirectStatuses = new InstanceRemovableStatuses();
redirectStatuses.setRedirectUrl("http://redirect.com");
doReturn(redirectStatuses).when(mockAdmin)
.getAggregatedHealthStatus(TEST_CLUSTER, instanceList, toBeStoppedInstanceList, false);
Response redirectResponse = mock(Response.class);
controllerRoutes.getAggregatedHealthStatus(mockAdmin).handle(request, redirectResponse);
verify(redirectResponse).redirect("http://redirect.com/aggregatedHealthStatus", 302);

// Test non-removable instances
Map<String, String> nonStoppableInstances = new HashMap() {
{
put("instance1_5000", "reason1");
put("instance2_5000", "reason2");
}
};
InstanceRemovableStatuses nonRemovableStatus = new InstanceRemovableStatuses();
nonRemovableStatus.setNonStoppableInstancesWithReasons(nonStoppableInstances);

doReturn(nonRemovableStatus).when(mockAdmin)
.getAggregatedHealthStatus(TEST_CLUSTER, instanceList, toBeStoppedInstanceList, false);
Response nonRemovableResponse = mock(Response.class);
StoppableNodeStatusResponse nonRemovableStoppableResponse = OBJECT_MAPPER.readValue(
controllerRoutes.getAggregatedHealthStatus(mockAdmin).handle(request, nonRemovableResponse).toString(),
StoppableNodeStatusResponse.class);
assertTrue(nonRemovableStoppableResponse.getStoppableInstances().isEmpty());
assertEquals(nonRemovableStoppableResponse.getNonStoppableInstancesWithReasons().size(), 2);
assertEquals(nonRemovableStoppableResponse.getNonStoppableInstancesWithReasons().get("instance1_5000"), "reason1");
assertEquals(nonRemovableStoppableResponse.getNonStoppableInstancesWithReasons().get("instance2_5000"), "reason2");

// Test removable instances
List<String> stoppableInstances = Arrays.asList("instance1_5000", "instance2_5000");
InstanceRemovableStatuses removableStatus = new InstanceRemovableStatuses();
removableStatus.setStoppableInstances(stoppableInstances);

doReturn(removableStatus).when(mockAdmin)
.getAggregatedHealthStatus(TEST_CLUSTER, instanceList, toBeStoppedInstanceList, false);
Response removableResponse = mock(Response.class);
StoppableNodeStatusResponse removableStoppableResponse = OBJECT_MAPPER.readValue(
controllerRoutes.getAggregatedHealthStatus(mockAdmin).handle(request, removableResponse).toString(),
StoppableNodeStatusResponse.class);
assertTrue(removableStoppableResponse.getNonStoppableInstancesWithReasons().isEmpty());
assertEquals(removableStoppableResponse.getStoppableInstances().size(), 2);
assertEquals(removableStoppableResponse.getStoppableInstances().get(0), "instance1_5000");
assertEquals(removableStoppableResponse.getStoppableInstances().get(1), "instance2_5000");

// Test removable and non removable instances
List<String> stoppableInstances1 = Arrays.asList("instance1_5000");
Map<String, String> nonStoppableInstances1 = new HashMap() {
{
put("instance2_5000", "reason2");
}
};

InstanceRemovableStatuses removableAndNonRemovableStatus = new InstanceRemovableStatuses();
removableAndNonRemovableStatus.setStoppableInstances(stoppableInstances1);
removableAndNonRemovableStatus.setNonStoppableInstancesWithReasons(nonStoppableInstances1);

doReturn(removableAndNonRemovableStatus).when(mockAdmin)
.getAggregatedHealthStatus(TEST_CLUSTER, instanceList, toBeStoppedInstanceList, false);
Response removableAndNonRemovableResponse = mock(Response.class);
StoppableNodeStatusResponse removableAndNonRemovableStoppableResponse = OBJECT_MAPPER.readValue(
controllerRoutes.getAggregatedHealthStatus(mockAdmin)
.handle(request, removableAndNonRemovableResponse)
.toString(),
StoppableNodeStatusResponse.class);
assertEquals(removableAndNonRemovableStoppableResponse.getNonStoppableInstancesWithReasons().size(), 1);
assertEquals(
removableAndNonRemovableStoppableResponse.getNonStoppableInstancesWithReasons().get("instance2_5000"),
"reason2");
assertEquals(removableAndNonRemovableStoppableResponse.getStoppableInstances().size(), 1);
assertEquals(removableAndNonRemovableStoppableResponse.getStoppableInstances().get(0), "instance1_5000");
}
}

0 comments on commit 18f0d37

Please sign in to comment.