From cd71c726b15408a41f184e4eb6602239a22eec02 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Fri, 12 Apr 2024 18:06:40 +0200 Subject: [PATCH 1/2] Admin API: stream stats --- .../broker/admin/v2/NonPersistentTopics.java | 43 +++++++++++-------- .../broker/admin/v2/PersistentTopics.java | 39 +++++++++++------ .../pulsar/broker/web/PulsarWebResource.java | 17 ++++++++ 3 files changed, 70 insertions(+), 29 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java index f8fe30ccecb2c..52909c926fe06 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java @@ -27,6 +27,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -47,6 +48,7 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; +import javax.ws.rs.core.StreamingOutput; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.service.Topic; @@ -60,6 +62,7 @@ import org.apache.pulsar.common.policies.data.stats.NonPersistentPartitionedTopicStatsImpl; import org.apache.pulsar.common.policies.data.stats.NonPersistentTopicStatsImpl; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -111,8 +114,7 @@ public void getPartitionedMetadata( @ApiResponse(code = 412, message = "Topic name is not valid"), @ApiResponse(code = 500, message = "Internal server error"), }) - public void getInternalStats( - @Suspended final AsyncResponse asyncResponse, + public StreamingOutput getInternalStats( @ApiParam(value = "Specify the tenant", required = true) @PathParam("tenant") String tenant, @ApiParam(value = "Specify the namespace", required = true) @@ -123,21 +125,28 @@ public void getInternalStats( @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, @QueryParam("metadata") @DefaultValue("false") boolean metadata) { validateTopicName(tenant, namespace, encodedTopic); - validateTopicOwnershipAsync(topicName, authoritative) - .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.GET_STATS)) - .thenCompose(__ -> { - Topic topic = getTopicReference(topicName); - boolean includeMetadata = metadata && hasSuperUserAccess(); - return topic.getInternalStats(includeMetadata); - }) - .thenAccept(asyncResponse::resume) - .exceptionally(ex -> { - if (isNot307And404Exception(ex)) { - log.error("[{}] Failed to get internal stats for topic {}", clientAppId(), topicName, ex); - } - resumeAsyncResponseExceptionally(asyncResponse, ex); - return null; - }); + return output -> { + validateTopicOwnershipAsync(topicName, authoritative) + .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.GET_STATS)) + .thenCompose(__ -> { + Topic topic = getTopicReference(topicName); + boolean includeMetadata = metadata && hasSuperUserAccess(); + return topic.getInternalStats(includeMetadata); + }) + .thenAccept(stats -> { + try { + ObjectMapperFactory.getMapper().getObjectMapper().writeValue(output, stats); + } catch (Throwable e) { + throw new CompletionException(e); + } + }) + .exceptionally(ex -> { + if (isNot307And404Exception(ex)) { + log.error("[{}] Failed to get internal stats for topic {}", clientAppId(), topicName, ex); + } + throw translateToWebApplicationException(ex); + }); + }; } @PUT diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 94fb1f53ac710..618a3ad2d6adc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -19,16 +19,20 @@ package org.apache.pulsar.broker.admin.v2; import static org.apache.pulsar.common.util.Codec.decode; +import static org.apache.pulsar.common.util.FutureUtil.unwrapCompletionException; import com.fasterxml.jackson.core.JsonProcessingException; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; +import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletionException; +import javax.servlet.http.HttpServletRequest; import javax.ws.rs.DELETE; import javax.ws.rs.DefaultValue; import javax.ws.rs.Encoded; @@ -42,8 +46,10 @@ import javax.ws.rs.WebApplicationException; import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.container.Suspended; +import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import javax.ws.rs.core.StreamingOutput; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase; @@ -83,6 +89,7 @@ import org.apache.pulsar.common.policies.data.stats.PartitionedTopicStatsImpl; import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1120,7 +1127,7 @@ public void deleteTopic( internalDeleteTopicAsync(authoritative, force) .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { - Throwable t = FutureUtil.unwrapCompletionException(ex); + Throwable t = unwrapCompletionException(ex); if (!force && (t instanceof BrokerServiceException.TopicBusyException)) { ex = new RestException(Response.Status.PRECONDITION_FAILED, t.getMessage()); @@ -1215,6 +1222,8 @@ public void getStats( }); } + @Context HttpServletRequest servletRequest; + @GET @Path("{tenant}/{namespace}/{topic}/internalStats") @ApiOperation(value = "Get the internal stats for the topic.", response = PersistentTopicInternalStats.class) @@ -1226,8 +1235,7 @@ public void getStats( @ApiResponse(code = 412, message = "Topic name is not valid"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate global cluster configuration") }) - public void getInternalStats( - @Suspended final AsyncResponse asyncResponse, + public StreamingOutput getInternalStats( @ApiParam(value = "Specify the tenant", required = true) @PathParam("tenant") String tenant, @ApiParam(value = "Specify the namespace", required = true) @@ -1238,15 +1246,22 @@ public void getInternalStats( @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, @QueryParam("metadata") @DefaultValue("false") boolean metadata) { validateTopicName(tenant, namespace, encodedTopic); - internalGetInternalStatsAsync(authoritative, metadata) - .thenAccept(asyncResponse::resume) - .exceptionally(ex -> { - if (isNot307And404Exception(ex)) { - log.error("[{}] Failed to get internal stats for topic {}", clientAppId(), topicName, ex); - } - resumeAsyncResponseExceptionally(asyncResponse, ex); - return null; - }); + return output -> { + internalGetInternalStatsAsync(authoritative, metadata) + .thenAccept(stats -> { + try { + ObjectMapperFactory.getMapper().getObjectMapper().writeValue(output, stats); + } catch (IOException error) { + throw new CompletionException(error); + } + }) + .exceptionally(ex -> { + if (isNot307And404Exception(ex)) { + log.error("[{}] Failed to get internal stats for topic {}", clientAppId(), topicName, ex); + } + throw translateToWebApplicationException(ex); + }); + }; } @GET diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index 2f437962002a3..4ec0a37ee5fe1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -1314,6 +1314,23 @@ public T sync(Supplier> supplier) { } } + protected static WebApplicationException translateToWebApplicationException(Throwable exception) { + Throwable realCause = FutureUtil.unwrapCompletionException(exception); + if (realCause instanceof WebApplicationException) { + return (WebApplicationException) realCause; + } else if (realCause instanceof BrokerServiceException.NotAllowedException) { + return new RestException(Status.CONFLICT, realCause); + } else if (realCause instanceof MetadataStoreException.NotFoundException) { + return new RestException(Status.NOT_FOUND, realCause); + } else if (realCause instanceof MetadataStoreException.BadVersionException) { + return new RestException(Status.CONFLICT, "Concurrent modification"); + } else if (realCause instanceof PulsarAdminException) { + return new RestException(((PulsarAdminException) realCause)); + } else { + return new RestException(realCause); + } + } + protected static void resumeAsyncResponseExceptionally(AsyncResponse asyncResponse, Throwable exception) { Throwable realCause = FutureUtil.unwrapCompletionException(exception); if (realCause instanceof WebApplicationException) { From 6286b071c414dfd83e7e6c82051bd0d0a54a7b5a Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Tue, 16 Apr 2024 08:51:17 +0200 Subject: [PATCH 2/2] remove leftover --- .../org/apache/pulsar/broker/admin/v2/PersistentTopics.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 618a3ad2d6adc..8ed23b3a5b457 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -32,7 +32,6 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletionException; -import javax.servlet.http.HttpServletRequest; import javax.ws.rs.DELETE; import javax.ws.rs.DefaultValue; import javax.ws.rs.Encoded; @@ -46,7 +45,6 @@ import javax.ws.rs.WebApplicationException; import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.container.Suspended; -import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.core.StreamingOutput; @@ -1222,8 +1220,6 @@ public void getStats( }); } - @Context HttpServletRequest servletRequest; - @GET @Path("{tenant}/{namespace}/{topic}/internalStats") @ApiOperation(value = "Get the internal stats for the topic.", response = PersistentTopicInternalStats.class)