From 46d9d6f9ea76df12aba48eb4ae45ef63cca89b18 Mon Sep 17 00:00:00 2001 From: Guangning E Date: Mon, 19 Dec 2022 14:10:28 +0800 Subject: [PATCH] [fix][broker]Update interceptor handler exception (#18940) (cherry picked from commit e07b67fd8c5c5cc0fd41ca0c0d956b0cb514c96e) (cherry picked from commit ad9c133a5ded46c9adad099ea77630cdec8bb9bf) --- .../pulsar/broker/web/ExceptionHandler.java | 27 ++++++++++++----- .../intercept/BrokerInterceptorTest.java | 30 +++++++++++++++++++ .../intercept/CounterBrokerInterceptor.java | 21 ++++++++++--- 3 files changed, 66 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ExceptionHandler.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ExceptionHandler.java index 8b200a8b9f6e7..b70168853a70e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ExceptionHandler.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ExceptionHandler.java @@ -25,6 +25,11 @@ import javax.servlet.http.HttpServletResponse; import javax.ws.rs.core.Response; import org.apache.pulsar.common.intercept.InterceptException; +import org.apache.pulsar.common.policies.data.ErrorData; +import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.eclipse.jetty.http.HttpField; +import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.MetaData; @@ -35,16 +40,22 @@ public class ExceptionHandler { public void handle(ServletResponse response, Exception ex) throws IOException { if (ex instanceof InterceptException) { - String reason = ex.getMessage(); - byte[] content = reason.getBytes(StandardCharsets.UTF_8); - MetaData.Response info = new MetaData.Response(); - info.setHttpVersion(HttpVersion.HTTP_1_1); - info.setReason(reason); - info.setStatus(((InterceptException) ex).getErrorCode()); - info.setContentLength(content.length); if (response instanceof org.eclipse.jetty.server.Response) { + String errorData = ObjectMapperFactory + .getThreadLocal().writeValueAsString(new ErrorData(ex.getMessage())); + byte[] errorBytes = errorData.getBytes(StandardCharsets.UTF_8); + int errorCode = ((InterceptException) ex).getErrorCode(); + HttpFields httpFields = new HttpFields(); + HttpField httpField = new HttpField(HttpHeader.CONTENT_TYPE, "application/json;charset=utf-8"); + httpFields.add(httpField); + MetaData.Response info = new MetaData.Response(HttpVersion.HTTP_1_1, errorCode, httpFields); + info.setHttpVersion(HttpVersion.HTTP_1_1); + info.setReason(errorData); + info.setStatus(errorCode); + info.setContentLength(errorBytes.length); ((org.eclipse.jetty.server.Response) response).getHttpChannel().sendResponse(info, - ByteBuffer.wrap(content), true); + ByteBuffer.wrap(errorBytes), + true); } else { ((HttpServletResponse) response).sendError(((InterceptException) ex).getErrorCode(), ex.getMessage()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java index 0e46b147e7b2d..37c0f8fddc185 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java @@ -32,6 +32,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.nar.NarClassLoader; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; @@ -40,7 +41,9 @@ import java.io.IOException; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletableFuture; import static org.mockito.ArgumentMatchers.same; @@ -209,8 +212,35 @@ public void onResponse(Call call, Response response) throws IOException { Awaitility.await().until(() -> !interceptor.getResponseList().isEmpty()); CounterBrokerInterceptor.ResponseEvent responseEvent = interceptor.getResponseList().get(0); Assert.assertEquals(responseEvent.getRequestUri(), "/admin/v3/test/asyncGet/my-topic/1000"); + Assert.assertEquals(responseEvent.getResponseStatus(), javax.ws.rs.core.Response.noContent().build().getStatus()); } + public void requestInterceptorFailedTest() { + Set allowedClusters = new HashSet<>(); + allowedClusters.add(configClusterName); + TenantInfoImpl tenantInfo = new TenantInfoImpl(new HashSet<>(), allowedClusters); + try { + admin.tenants().createTenant("test-interceptor-failed-tenant", tenantInfo); + Assert.fail("Create tenant because interceptor should fail"); + } catch (PulsarAdminException e) { + Assert.assertEquals(e.getHttpError(), "Create tenant failed"); + } + + try { + admin.namespaces().createNamespace("public/test-interceptor-failed-namespace"); + Assert.fail("Create namespace because interceptor should fail"); + } catch (PulsarAdminException e) { + Assert.assertEquals(e.getHttpError(), "Create namespace failed"); + } + + try { + admin.topics().createNonPartitionedTopic("persistent://public/default/test-interceptor-failed-topic"); + Assert.fail("Create topic because interceptor should fail"); + } catch (PulsarAdminException e) { + Assert.assertEquals(e.getHttpError(), "Create topic failed"); + } + } + } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java index da4d4f0163172..05616ef0bcff2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java @@ -32,6 +32,7 @@ import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Entry; +import org.apache.http.HttpStatus; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.Producer; @@ -41,6 +42,7 @@ import org.apache.pulsar.common.api.proto.BaseCommand; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.api.proto.CommandAck; +import org.apache.pulsar.common.intercept.InterceptException; import org.eclipse.jetty.server.Response; @@ -147,10 +149,20 @@ public void onConnectionClosed(ServerCnx cnx) { } @Override - public void onWebserviceRequest(ServletRequest request) { - count ++; + public void onWebserviceRequest(ServletRequest request) throws IOException, ServletException, InterceptException { + count++; + String url = ((HttpServletRequest) request).getRequestURL().toString(); if (log.isDebugEnabled()) { - log.debug("[{}] On [{}] Webservice request", count, ((HttpServletRequest) request).getRequestURL().toString()); + log.debug("[{}] On [{}] Webservice request", count, url); + } + if (url.contains("/admin/v2/tenants/test-interceptor-failed-tenant")) { + throw new InterceptException(HttpStatus.SC_PRECONDITION_FAILED, "Create tenant failed"); + } + if (url.contains("/admin/v2/namespaces/public/test-interceptor-failed-namespace")) { + throw new InterceptException(HttpStatus.SC_PRECONDITION_FAILED, "Create namespace failed"); + } + if (url.contains("/admin/v2/persistent/public/default/test-interceptor-failed-topic")) { + throw new InterceptException(HttpStatus.SC_PRECONDITION_FAILED, "Create topic failed"); } } @@ -158,7 +170,8 @@ public void onWebserviceRequest(ServletRequest request) { public void onWebserviceResponse(ServletRequest request, ServletResponse response) { count ++; if (log.isDebugEnabled()) { - log.debug("[{}] On [{}] Webservice response {}", count, ((HttpServletRequest) request).getRequestURL().toString(), response); + log.debug("[{}] On [{}] Webservice response {}", + count, ((HttpServletRequest) request).getRequestURL().toString(), response); } if (response instanceof Response) { Response res = (Response) response;