Skip to content

Commit

Permalink
Admin API: stream stats
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli committed Apr 15, 2024
1 parent 36a93a3 commit cd71c72
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -47,6 +48,7 @@
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.service.Topic;
Expand All @@ -60,6 +62,7 @@
import org.apache.pulsar.common.policies.data.stats.NonPersistentPartitionedTopicStatsImpl;
import org.apache.pulsar.common.policies.data.stats.NonPersistentTopicStatsImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -111,8 +114,7 @@ public void getPartitionedMetadata(
@ApiResponse(code = 412, message = "Topic name is not valid"),
@ApiResponse(code = 500, message = "Internal server error"),
})
public void getInternalStats(
@Suspended final AsyncResponse asyncResponse,
public StreamingOutput getInternalStats(
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
Expand All @@ -123,21 +125,28 @@ public void getInternalStats(
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("metadata") @DefaultValue("false") boolean metadata) {
validateTopicName(tenant, namespace, encodedTopic);
validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.GET_STATS))
.thenCompose(__ -> {
Topic topic = getTopicReference(topicName);
boolean includeMetadata = metadata && hasSuperUserAccess();
return topic.getInternalStats(includeMetadata);
})
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to get internal stats for topic {}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
return output -> {
validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.GET_STATS))
.thenCompose(__ -> {
Topic topic = getTopicReference(topicName);
boolean includeMetadata = metadata && hasSuperUserAccess();
return topic.getInternalStats(includeMetadata);
})
.thenAccept(stats -> {
try {
ObjectMapperFactory.getMapper().getObjectMapper().writeValue(output, stats);
} catch (Throwable e) {
throw new CompletionException(e);
}
})
.exceptionally(ex -> {
if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to get internal stats for topic {}", clientAppId(), topicName, ex);
}
throw translateToWebApplicationException(ex);
});
};
}

@PUT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,20 @@
package org.apache.pulsar.broker.admin.v2;

import static org.apache.pulsar.common.util.Codec.decode;
import static org.apache.pulsar.common.util.FutureUtil.unwrapCompletionException;
import com.fasterxml.jackson.core.JsonProcessingException;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionException;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.Encoded;
Expand All @@ -42,8 +46,10 @@
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
Expand Down Expand Up @@ -83,6 +89,7 @@
import org.apache.pulsar.common.policies.data.stats.PartitionedTopicStatsImpl;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -1120,7 +1127,7 @@ public void deleteTopic(
internalDeleteTopicAsync(authoritative, force)
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
Throwable t = FutureUtil.unwrapCompletionException(ex);
Throwable t = unwrapCompletionException(ex);
if (!force && (t instanceof BrokerServiceException.TopicBusyException)) {
ex = new RestException(Response.Status.PRECONDITION_FAILED,
t.getMessage());
Expand Down Expand Up @@ -1215,6 +1222,8 @@ public void getStats(
});
}

@Context HttpServletRequest servletRequest;

@GET
@Path("{tenant}/{namespace}/{topic}/internalStats")
@ApiOperation(value = "Get the internal stats for the topic.", response = PersistentTopicInternalStats.class)
Expand All @@ -1226,8 +1235,7 @@ public void getStats(
@ApiResponse(code = 412, message = "Topic name is not valid"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration") })
public void getInternalStats(
@Suspended final AsyncResponse asyncResponse,
public StreamingOutput getInternalStats(
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
Expand All @@ -1238,15 +1246,22 @@ public void getInternalStats(
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("metadata") @DefaultValue("false") boolean metadata) {
validateTopicName(tenant, namespace, encodedTopic);
internalGetInternalStatsAsync(authoritative, metadata)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to get internal stats for topic {}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
return output -> {
internalGetInternalStatsAsync(authoritative, metadata)
.thenAccept(stats -> {
try {
ObjectMapperFactory.getMapper().getObjectMapper().writeValue(output, stats);
} catch (IOException error) {
throw new CompletionException(error);
}
})
.exceptionally(ex -> {
if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to get internal stats for topic {}", clientAppId(), topicName, ex);
}
throw translateToWebApplicationException(ex);
});
};
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1314,6 +1314,23 @@ public <T> T sync(Supplier<CompletableFuture<T>> supplier) {
}
}

protected static WebApplicationException translateToWebApplicationException(Throwable exception) {
Throwable realCause = FutureUtil.unwrapCompletionException(exception);
if (realCause instanceof WebApplicationException) {
return (WebApplicationException) realCause;
} else if (realCause instanceof BrokerServiceException.NotAllowedException) {
return new RestException(Status.CONFLICT, realCause);
} else if (realCause instanceof MetadataStoreException.NotFoundException) {
return new RestException(Status.NOT_FOUND, realCause);
} else if (realCause instanceof MetadataStoreException.BadVersionException) {
return new RestException(Status.CONFLICT, "Concurrent modification");
} else if (realCause instanceof PulsarAdminException) {
return new RestException(((PulsarAdminException) realCause));
} else {
return new RestException(realCause);
}
}

protected static void resumeAsyncResponseExceptionally(AsyncResponse asyncResponse, Throwable exception) {
Throwable realCause = FutureUtil.unwrapCompletionException(exception);
if (realCause instanceof WebApplicationException) {
Expand Down

0 comments on commit cd71c72

Please sign in to comment.