Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Admin API: stream stats-internal with StreamingOutput #249

Open
wants to merge 2 commits into
base: 3.1_ds
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -47,6 +48,7 @@
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.service.Topic;
Expand All @@ -60,6 +62,7 @@
import org.apache.pulsar.common.policies.data.stats.NonPersistentPartitionedTopicStatsImpl;
import org.apache.pulsar.common.policies.data.stats.NonPersistentTopicStatsImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -111,8 +114,7 @@ public void getPartitionedMetadata(
@ApiResponse(code = 412, message = "Topic name is not valid"),
@ApiResponse(code = 500, message = "Internal server error"),
})
public void getInternalStats(
@Suspended final AsyncResponse asyncResponse,
public StreamingOutput getInternalStats(
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
Expand All @@ -123,21 +125,28 @@ public void getInternalStats(
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("metadata") @DefaultValue("false") boolean metadata) {
validateTopicName(tenant, namespace, encodedTopic);
validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.GET_STATS))
.thenCompose(__ -> {
Topic topic = getTopicReference(topicName);
boolean includeMetadata = metadata && hasSuperUserAccess();
return topic.getInternalStats(includeMetadata);
})
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to get internal stats for topic {}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
return output -> {
validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.GET_STATS))
.thenCompose(__ -> {
Topic topic = getTopicReference(topicName);
boolean includeMetadata = metadata && hasSuperUserAccess();
return topic.getInternalStats(includeMetadata);
})
.thenAccept(stats -> {
try {
ObjectMapperFactory.getMapper().getObjectMapper().writeValue(output, stats);
} catch (Throwable e) {
throw new CompletionException(e);
}
})
.exceptionally(ex -> {
if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to get internal stats for topic {}", clientAppId(), topicName, ex);
}
throw translateToWebApplicationException(ex);
});
};
}

@PUT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,20 @@
package org.apache.pulsar.broker.admin.v2;

import static org.apache.pulsar.common.util.Codec.decode;
import static org.apache.pulsar.common.util.FutureUtil.unwrapCompletionException;
import com.fasterxml.jackson.core.JsonProcessingException;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionException;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.Encoded;
Expand All @@ -42,8 +46,10 @@
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
Expand Down Expand Up @@ -83,6 +89,7 @@
import org.apache.pulsar.common.policies.data.stats.PartitionedTopicStatsImpl;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -1120,7 +1127,7 @@ public void deleteTopic(
internalDeleteTopicAsync(authoritative, force)
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
Throwable t = FutureUtil.unwrapCompletionException(ex);
Throwable t = unwrapCompletionException(ex);
if (!force && (t instanceof BrokerServiceException.TopicBusyException)) {
ex = new RestException(Response.Status.PRECONDITION_FAILED,
t.getMessage());
Expand Down Expand Up @@ -1215,6 +1222,8 @@ public void getStats(
});
}

@Context HttpServletRequest servletRequest;

@GET
@Path("{tenant}/{namespace}/{topic}/internalStats")
@ApiOperation(value = "Get the internal stats for the topic.", response = PersistentTopicInternalStats.class)
Expand All @@ -1226,8 +1235,7 @@ public void getStats(
@ApiResponse(code = 412, message = "Topic name is not valid"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration") })
public void getInternalStats(
@Suspended final AsyncResponse asyncResponse,
public StreamingOutput getInternalStats(
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
Expand All @@ -1238,15 +1246,22 @@ public void getInternalStats(
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("metadata") @DefaultValue("false") boolean metadata) {
validateTopicName(tenant, namespace, encodedTopic);
internalGetInternalStatsAsync(authoritative, metadata)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to get internal stats for topic {}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
return output -> {
internalGetInternalStatsAsync(authoritative, metadata)
.thenAccept(stats -> {
try {
ObjectMapperFactory.getMapper().getObjectMapper().writeValue(output, stats);
} catch (IOException error) {
throw new CompletionException(error);
}
})
.exceptionally(ex -> {
if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to get internal stats for topic {}", clientAppId(), topicName, ex);
}
throw translateToWebApplicationException(ex);
});
};
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1314,6 +1314,23 @@ public <T> T sync(Supplier<CompletableFuture<T>> supplier) {
}
}

protected static WebApplicationException translateToWebApplicationException(Throwable exception) {
Throwable realCause = FutureUtil.unwrapCompletionException(exception);
if (realCause instanceof WebApplicationException) {
return (WebApplicationException) realCause;
} else if (realCause instanceof BrokerServiceException.NotAllowedException) {
return new RestException(Status.CONFLICT, realCause);
} else if (realCause instanceof MetadataStoreException.NotFoundException) {
return new RestException(Status.NOT_FOUND, realCause);
} else if (realCause instanceof MetadataStoreException.BadVersionException) {
return new RestException(Status.CONFLICT, "Concurrent modification");
} else if (realCause instanceof PulsarAdminException) {
return new RestException(((PulsarAdminException) realCause));
} else {
return new RestException(realCause);
}
}

protected static void resumeAsyncResponseExceptionally(AsyncResponse asyncResponse, Throwable exception) {
Throwable realCause = FutureUtil.unwrapCompletionException(exception);
if (realCause instanceof WebApplicationException) {
Expand Down
Loading