Skip to content

Commit

Permalink
[fix][broker]Update interceptor handler exception (apache#18940)
Browse files Browse the repository at this point in the history
(cherry picked from commit e07b67f)
(cherry picked from commit ad9c133)
  • Loading branch information
tuteng authored and nicoloboschi committed Jan 11, 2023
1 parent 1921c46 commit 46d9d6f
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.Response;
import org.apache.pulsar.common.intercept.InterceptException;
import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;

Expand All @@ -35,16 +40,22 @@ public class ExceptionHandler {

public void handle(ServletResponse response, Exception ex) throws IOException {
if (ex instanceof InterceptException) {
String reason = ex.getMessage();
byte[] content = reason.getBytes(StandardCharsets.UTF_8);
MetaData.Response info = new MetaData.Response();
info.setHttpVersion(HttpVersion.HTTP_1_1);
info.setReason(reason);
info.setStatus(((InterceptException) ex).getErrorCode());
info.setContentLength(content.length);
if (response instanceof org.eclipse.jetty.server.Response) {
String errorData = ObjectMapperFactory
.getThreadLocal().writeValueAsString(new ErrorData(ex.getMessage()));
byte[] errorBytes = errorData.getBytes(StandardCharsets.UTF_8);
int errorCode = ((InterceptException) ex).getErrorCode();
HttpFields httpFields = new HttpFields();
HttpField httpField = new HttpField(HttpHeader.CONTENT_TYPE, "application/json;charset=utf-8");
httpFields.add(httpField);
MetaData.Response info = new MetaData.Response(HttpVersion.HTTP_1_1, errorCode, httpFields);
info.setHttpVersion(HttpVersion.HTTP_1_1);
info.setReason(errorData);
info.setStatus(errorCode);
info.setContentLength(errorBytes.length);
((org.eclipse.jetty.server.Response) response).getHttpChannel().sendResponse(info,
ByteBuffer.wrap(content), true);
ByteBuffer.wrap(errorBytes),
true);
} else {
((HttpServletResponse) response).sendError(((InterceptException) ex).getErrorCode(),
ex.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
Expand All @@ -40,7 +41,9 @@

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import static org.mockito.ArgumentMatchers.same;
Expand Down Expand Up @@ -209,8 +212,35 @@ public void onResponse(Call call, Response response) throws IOException {
Awaitility.await().until(() -> !interceptor.getResponseList().isEmpty());
CounterBrokerInterceptor.ResponseEvent responseEvent = interceptor.getResponseList().get(0);
Assert.assertEquals(responseEvent.getRequestUri(), "/admin/v3/test/asyncGet/my-topic/1000");

Assert.assertEquals(responseEvent.getResponseStatus(),
javax.ws.rs.core.Response.noContent().build().getStatus());
}

public void requestInterceptorFailedTest() {
Set<String> allowedClusters = new HashSet<>();
allowedClusters.add(configClusterName);
TenantInfoImpl tenantInfo = new TenantInfoImpl(new HashSet<>(), allowedClusters);
try {
admin.tenants().createTenant("test-interceptor-failed-tenant", tenantInfo);
Assert.fail("Create tenant because interceptor should fail");
} catch (PulsarAdminException e) {
Assert.assertEquals(e.getHttpError(), "Create tenant failed");
}

try {
admin.namespaces().createNamespace("public/test-interceptor-failed-namespace");
Assert.fail("Create namespace because interceptor should fail");
} catch (PulsarAdminException e) {
Assert.assertEquals(e.getHttpError(), "Create namespace failed");
}

try {
admin.topics().createNonPartitionedTopic("persistent://public/default/test-interceptor-failed-topic");
Assert.fail("Create topic because interceptor should fail");
} catch (PulsarAdminException e) {
Assert.assertEquals(e.getHttpError(), "Create topic failed");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.http.HttpStatus;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Producer;
Expand All @@ -41,6 +42,7 @@
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.intercept.InterceptException;
import org.eclipse.jetty.server.Response;


Expand Down Expand Up @@ -147,18 +149,29 @@ public void onConnectionClosed(ServerCnx cnx) {
}

@Override
public void onWebserviceRequest(ServletRequest request) {
count ++;
public void onWebserviceRequest(ServletRequest request) throws IOException, ServletException, InterceptException {
count++;
String url = ((HttpServletRequest) request).getRequestURL().toString();
if (log.isDebugEnabled()) {
log.debug("[{}] On [{}] Webservice request", count, ((HttpServletRequest) request).getRequestURL().toString());
log.debug("[{}] On [{}] Webservice request", count, url);
}
if (url.contains("/admin/v2/tenants/test-interceptor-failed-tenant")) {
throw new InterceptException(HttpStatus.SC_PRECONDITION_FAILED, "Create tenant failed");
}
if (url.contains("/admin/v2/namespaces/public/test-interceptor-failed-namespace")) {
throw new InterceptException(HttpStatus.SC_PRECONDITION_FAILED, "Create namespace failed");
}
if (url.contains("/admin/v2/persistent/public/default/test-interceptor-failed-topic")) {
throw new InterceptException(HttpStatus.SC_PRECONDITION_FAILED, "Create topic failed");
}
}

@Override
public void onWebserviceResponse(ServletRequest request, ServletResponse response) {
count ++;
if (log.isDebugEnabled()) {
log.debug("[{}] On [{}] Webservice response {}", count, ((HttpServletRequest) request).getRequestURL().toString(), response);
log.debug("[{}] On [{}] Webservice response {}",
count, ((HttpServletRequest) request).getRequestURL().toString(), response);
}
if (response instanceof Response) {
Response res = (Response) response;
Expand Down

0 comments on commit 46d9d6f

Please sign in to comment.