diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 6c09bd982f3c5..af21951851007 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -148,13 +148,13 @@ protected CompletableFuture tryCreatePartitionsAsync(int numPartitions) { } List> futures = new ArrayList<>(numPartitions); for (int i = 0; i < numPartitions; i++) { - futures.add(tryCreatePartitionAsync(i, null)); + futures.add(tryCreatePartitionAsync(i)); } return FutureUtil.waitForAll(futures); } - private CompletableFuture tryCreatePartitionAsync(final int partition, CompletableFuture reuseFuture) { - CompletableFuture result = reuseFuture == null ? new CompletableFuture<>() : reuseFuture; + private CompletableFuture tryCreatePartitionAsync(final int partition) { + CompletableFuture result = new CompletableFuture<>(); getPulsarResources().getTopicResources().createPersistentTopicAsync(topicName.getPartition(partition)) .thenAccept(r -> { if (log.isDebugEnabled()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 13383cd30743c..d79fb65ce4ff7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -19,10 +19,10 @@ package org.apache.pulsar.broker.admin.impl; import static org.apache.pulsar.broker.PulsarService.isTransactionInternalName; -import static org.apache.pulsar.broker.resources.PulsarResources.DEFAULT_OPERATION_TIMEOUT_SEC; import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsTransactionCoordinatorAssign; import com.fasterxml.jackson.core.JsonProcessingException; import com.github.zafarkhaja.semver.Version; +import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -46,6 +46,7 @@ import java.util.function.BiConsumer; import java.util.stream.Collectors; import java.util.stream.IntStream; +import javax.annotation.Nonnull; import javax.ws.rs.WebApplicationException; import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.core.Response; @@ -449,94 +450,161 @@ protected void internalCreateNonPartitionedTopic(boolean authoritative, Map 0 && numPartitions > maxPartitions) { - throw new RestException(Status.NOT_ACCEPTABLE, - "Number of partitions should be less than or equal to " + maxPartitions); - } - - if (topicName.isGlobal() && isNamespaceReplicated(topicName.getNamespaceObject())) { - Set clusters = getNamespaceReplicatedClusters(topicName.getNamespaceObject()); - if (!clusters.contains(pulsar().getConfig().getClusterName())) { - log.error("[{}] local cluster is not part of replicated cluster for namespace {}", clientAppId(), - topicName); - throw new RestException(Status.FORBIDDEN, "Local cluster is not part of replicate cluster list"); - } - try { - tryCreatePartitionsAsync(numPartitions).get(DEFAULT_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS); - createSubscriptions(topicName, numPartitions, force).get(DEFAULT_OPERATION_TIMEOUT_SEC, - TimeUnit.SECONDS); - } catch (Exception e) { - if (e.getCause() instanceof RestException) { - throw (RestException) e.getCause(); + protected @Nonnull CompletableFuture internalUpdatePartitionedTopicAsync(int expectPartitions, + boolean updateLocal, + boolean force) { + PulsarService pulsarService = pulsar(); + return pulsarService.getBrokerService().fetchPartitionedTopicMetadataAsync(topicName) + .thenCompose(partitionedTopicMetadata -> { + int currentMetadataPartitions = partitionedTopicMetadata.partitions; + if (currentMetadataPartitions <= 0) { + throw new RestException(422 /* Unprocessable entity*/, + String.format("Topic %s is not the partitioned topic.", topicName)); } - log.error("[{}] Failed to update partitioned topic {}", clientAppId(), topicName, e); - throw new RestException(e); - } - // if this cluster is the first hop which needs to coordinate with other clusters then update partitions in - // other clusters and then update number of partitions. - if (!updateLocalTopicOnly) { - CompletableFuture updatePartition = new CompletableFuture<>(); - updatePartitionInOtherCluster(numPartitions, clusters).thenRun(() -> { - try { - namespaceResources().getPartitionedTopicResources() - .updatePartitionedTopicAsync(topicName, p -> - new PartitionedTopicMetadata(numPartitions) - ).thenAccept(r -> updatePartition.complete(null)).exceptionally(ex -> { - updatePartition.completeExceptionally(ex.getCause()); - return null; - }); - } catch (Exception e) { - updatePartition.completeExceptionally(e); - } - }).exceptionally(ex -> { - updatePartition.completeExceptionally(ex); - return null; - }); + if (expectPartitions < currentMetadataPartitions) { + throw new RestException(422 /* Unprocessable entity*/, + String.format("Expect partitions %s can't less than current partitions %s.", + expectPartitions, currentMetadataPartitions)); + } + int brokerMaximumPartitionsPerTopic = pulsarService.getConfiguration() + .getMaxNumPartitionsPerPartitionedTopic(); + if (brokerMaximumPartitionsPerTopic != 0 && expectPartitions > brokerMaximumPartitionsPerTopic) { + throw new RestException(422 /* Unprocessable entity*/, + String.format("Expect partitions %s grater than maximum partitions per topic %s", + expectPartitions, brokerMaximumPartitionsPerTopic)); + } + final PulsarAdmin admin; try { - updatePartition.get(DEFAULT_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS); - } catch (Exception e) { - log.error("{} Failed to update number of partitions in zk for topic {} and partitions {}", - clientAppId(), topicName, numPartitions, e); - if (e.getCause() instanceof RestException) { - throw (RestException) e.getCause(); - } - throw new RestException(e); + admin = pulsarService.getAdminClient(); + } catch (PulsarServerException ex) { + throw new RestException(Status.INTERNAL_SERVER_ERROR, Throwables.getRootCause(ex)); } - } - return; - } - - try { - tryCreatePartitionsAsync(numPartitions).get(DEFAULT_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS); - updatePartitionedTopic(topicName, numPartitions, force).get(DEFAULT_OPERATION_TIMEOUT_SEC, - TimeUnit.SECONDS); - } catch (Exception e) { - if (e.getCause() instanceof RestException) { - throw (RestException) e.getCause(); - } - log.error("[{}] Failed to update partitioned topic {}", clientAppId(), topicName, e); - throw new RestException(e); - } + final CompletableFuture checkFuture; + if (!force) { + checkFuture = admin.topics().getListAsync(topicName.getNamespace(), topicName.getDomain()) + .thenAccept(topics -> { + final List existingPartitions = topics.stream() + .map(TopicName::get) + .filter(candidateTopicName -> candidateTopicName.getPartitionedTopicName() + .equals(topicName.getPartitionedTopicName())) + .collect(Collectors.toList()); + // Check whether exist unexpected partition + Optional maximumPartitionIndex = existingPartitions.stream() + .map(TopicName::getPartitionIndex) + .max(Integer::compareTo); + if (maximumPartitionIndex.isPresent() + && maximumPartitionIndex.get() >= currentMetadataPartitions) { + List unexpectedPartitions = existingPartitions.stream() + .filter(candidateTopicName -> + candidateTopicName + .getPartitionIndex() > currentMetadataPartitions) + .map(TopicName::toString) + .collect(Collectors.toList()); + throw new RestException(Status.CONFLICT, + String.format( + "Exist unexpected topic partition(partition index grater than" + + " current metadata maximum index %s) %s ", + currentMetadataPartitions, + StringUtils.join(unexpectedPartitions, ","))); + } + }); + } else { + checkFuture = CompletableFuture.completedFuture(null); + } + return checkFuture.thenCompose(topics -> { + final CompletableFuture updateMetadataFuture = (expectPartitions == currentMetadataPartitions) + // current metadata partitions is equals to expect partitions + ? CompletableFuture.completedFuture(null) + // update current cluster topic metadata + : namespaceResources().getPartitionedTopicResources() + .updatePartitionedTopicAsync(topicName, m -> + new PartitionedTopicMetadata(expectPartitions, m.properties)); + return updateMetadataFuture + // create missing partitions + .thenCompose(__ -> tryCreatePartitionsAsync(expectPartitions)) + // because we should consider the compatibility. + // Copy subscriptions from partition 0 instead of being created by the customer + .thenCompose(__ -> + admin.topics().getStatsAsync(topicName.getPartition(0).toString()) + .thenCompose(stats -> { + List> futures = stats.getSubscriptions().entrySet() + // We must not re-create non-durable subscriptions on the new partitions + .stream().filter(entry -> entry.getValue().isDurable()) + .map(entry -> { + final List> innerFutures = + new ArrayList<>(expectPartitions); + for (int i = 0; i < expectPartitions; i++) { + innerFutures.add(admin.topics().createSubscriptionAsync( + topicName.getPartition(i).toString(), + entry.getKey(), MessageId.earliest, + entry.getValue().isReplicated(), + entry.getValue().getSubscriptionProperties()) + .exceptionally(ex -> { + Throwable rc = + FutureUtil.unwrapCompletionException(ex); + if (!(rc instanceof PulsarAdminException + .ConflictException)) { + log.warn("[{}] got an error while copying" + + " the subscription to the" + + " partition {}.", topicName, + Throwables.getRootCause(rc)); + throw FutureUtil.wrapToCompletionException(rc); + } + // Ignore subscription already exist exception + return null; + })); + } + return FutureUtil.waitForAll(innerFutures); + }).collect(Collectors.toList()); + return FutureUtil.waitForAll(futures); + }) + ); + }).thenCompose(__ -> { + if (updateLocal || !topicName.isGlobal()) { + return CompletableFuture.completedFuture(null); + } + // update remote cluster + return namespaceResources().getPoliciesAsync(namespaceName) + .thenCompose(policies -> { + if (!policies.isPresent()) { + return CompletableFuture.completedFuture(null); + } + final Set replicationClusters = policies.get().replication_clusters; + if (replicationClusters.size() == 0) { + return CompletableFuture.completedFuture(null); + } + boolean containsCurrentCluster = + replicationClusters.contains(pulsar().getConfig().getClusterName()); + if (!containsCurrentCluster) { + log.error("[{}] local cluster is not part of replicated cluster for namespace {}", + clientAppId(), topicName); + throw new RestException(422, + "Local cluster is not part of replicate cluster list"); + } + if (replicationClusters.size() == 1) { + // The replication clusters just has the current cluster itself. + return CompletableFuture.completedFuture(null); + } + List> futures = replicationClusters.stream() + .map(replicationCluster -> admin.clusters().getClusterAsync(replicationCluster) + .thenCompose(clusterData -> pulsarService.getBrokerService() + .getClusterPulsarAdmin(replicationCluster, Optional.of(clusterData)) + .topics().updatePartitionedTopicAsync(topicName.toString(), + expectPartitions, true, force) + .exceptionally(ex -> { + log.warn("[{}][{}] Update remote cluster partition fail.", + topicName, replicationCluster, ex); + throw FutureUtil.wrapToCompletionException(ex); + }) + ) + ).collect(Collectors.toList()); + return FutureUtil.waitForAll(futures); + }); + }); + }); } protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) { @@ -4352,127 +4420,101 @@ private PersistentReplicator getReplicatorReference(String replName, PersistentT } } - - private CompletableFuture updatePartitionedTopic(TopicName topicName, int numPartitions, boolean force) { - CompletableFuture result = new CompletableFuture<>(); - createSubscriptions(topicName, numPartitions, force).thenCompose(__ -> { - CompletableFuture future = namespaceResources().getPartitionedTopicResources() - .updatePartitionedTopicAsync(topicName, p -> new PartitionedTopicMetadata(numPartitions)); - future.exceptionally(ex -> { - // If the update operation fails, clean up the partitions that were created - getPartitionedTopicMetadataAsync(topicName, false, false).thenAccept(metadata -> { - int oldPartition = metadata.partitions; - for (int i = oldPartition; i < numPartitions; i++) { - topicResources().deletePersistentTopicAsync(topicName.getPartition(i)).exceptionally(ex1 -> { - log.warn("[{}] Failed to clean up managedLedger {}", clientAppId(), topicName, - ex1.getCause()); - return null; - }); - } - }).exceptionally(e -> { - log.warn("[{}] Failed to clean up managedLedger", topicName, e); - return null; - }); + private CompletableFuture updatePartitionedTopic(TopicName topicName, int expectPartitions) { + CompletableFuture future = namespaceResources().getPartitionedTopicResources() + .updatePartitionedTopicAsync(topicName, p -> + new PartitionedTopicMetadata(expectPartitions, p.properties)); + future.exceptionally(ex -> { + // If the update operation fails, clean up the partitions that were created + getPartitionedTopicMetadataAsync(topicName, false, false) + .thenAccept(metadata -> { + int oldPartition = metadata.partitions; + for (int i = oldPartition; i < expectPartitions; i++) { + topicResources().deletePersistentTopicAsync(topicName.getPartition(i)).exceptionally(ex1 -> { + log.warn("[{}] Failed to clean up managedLedger {}", clientAppId(), topicName, + ex1.getCause()); + return null; + }); + } + }).exceptionally(e -> { + log.warn("[{}] Failed to clean up managedLedger", topicName, e); return null; }); - return future; - }).thenAccept(__ -> result.complete(null)).exceptionally(ex -> { - if (force && ex.getCause() instanceof PulsarAdminException.ConflictException) { - result.complete(null); - return null; - } - result.completeExceptionally(ex); return null; }); - return result; + return future.thenCompose(__ -> createSubscriptions(topicName, expectPartitions)); } /** * It creates subscriptions for new partitions of existing partitioned-topics. * * @param topicName : topic-name: persistent://prop/cluster/ns/topic - * @param numPartitions : number partitions for the topics - * @param ignoreConflictException : If true, ignore ConflictException: subscription already exists for topic + * @param expectPartitions : number of expected partitions + * */ - private CompletableFuture createSubscriptions(TopicName topicName, int numPartitions, - boolean ignoreConflictException) { + private CompletableFuture createSubscriptions(TopicName topicName, int expectPartitions) { CompletableFuture result = new CompletableFuture<>(); - pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName).thenAccept(partitionMetadata -> { - if (partitionMetadata.partitions < 1) { - result.completeExceptionally(new RestException(Status.CONFLICT, "Topic is not partitioned topic")); - return; - } - - if (partitionMetadata.partitions >= numPartitions) { - result.completeExceptionally(new RestException(Status.CONFLICT, - "number of partitions must be more than existing " + partitionMetadata.partitions)); - return; - } - - PulsarAdmin admin; - try { - admin = pulsar().getAdminClient(); - } catch (PulsarServerException e1) { - result.completeExceptionally(e1); - return; - } - - admin.topics().getStatsAsync(topicName.getPartition(0).toString()).thenAccept(stats -> { - List> subscriptionFutures = new ArrayList<>(); + if (expectPartitions < 1) { + return FutureUtil.failedFuture(new RestException(Status.CONFLICT, "Topic is not partitioned topic")); + } + PulsarAdmin admin; + try { + admin = pulsar().getAdminClient(); + } catch (PulsarServerException e1) { + return FutureUtil.failedFuture(e1); + } - stats.getSubscriptions().entrySet().forEach(e -> { - String subscription = e.getKey(); - SubscriptionStats ss = e.getValue(); - if (!ss.isDurable()) { - // We must not re-create non-durable subscriptions on the new partitions - return; - } - boolean replicated = ss.isReplicated(); + admin.topics().getStatsAsync(topicName.getPartition(0).toString()).thenAccept(stats -> { + List> subscriptionFutures = new ArrayList<>(); - for (int i = partitionMetadata.partitions; i < numPartitions; i++) { - final String topicNamePartition = topicName.getPartition(i).toString(); - CompletableFuture future = new CompletableFuture<>(); - admin.topics().createSubscriptionAsync(topicNamePartition, - subscription, MessageId.earliest, replicated, ss.getSubscriptionProperties()) - .whenComplete((__, ex) -> { - if (ex == null) { + stats.getSubscriptions().entrySet().forEach(e -> { + String subscription = e.getKey(); + SubscriptionStats ss = e.getValue(); + if (!ss.isDurable()) { + // We must not re-create non-durable subscriptions on the new partitions + return; + } + boolean replicated = ss.isReplicated(); + + for (int i = 0; i < expectPartitions; i++) { + final String topicNamePartition = topicName.getPartition(i).toString(); + CompletableFuture future = new CompletableFuture<>(); + admin.topics().createSubscriptionAsync(topicNamePartition, + subscription, MessageId.earliest, replicated, ss.getSubscriptionProperties()) + .whenComplete((__, ex) -> { + if (ex == null) { + future.complete(null); + } else { + Throwable realCause = FutureUtil.unwrapCompletionException(ex); + if (realCause instanceof PulsarAdminException.ConflictException) { future.complete(null); } else { - if (ignoreConflictException && ex instanceof PulsarAdminException.ConflictException) { - future.complete(null); - } else { - future.completeExceptionally(ex); - } + future.completeExceptionally(realCause); } - }); - subscriptionFutures.add(future); - } - }); + } + }); + subscriptionFutures.add(future); + } + }); - FutureUtil.waitForAll(subscriptionFutures).thenRun(() -> { - log.info("[{}] Successfully created subscriptions on new partitions {}", clientAppId(), topicName); - result.complete(null); - }).exceptionally(ex -> { - log.warn("[{}] Failed to create subscriptions on new partitions for {}", - clientAppId(), topicName, ex); - result.completeExceptionally(ex); - return null; - }); + FutureUtil.waitForAll(subscriptionFutures).thenRun(() -> { + log.info("[{}] Successfully created subscriptions on new partitions {}", clientAppId(), topicName); + result.complete(null); }).exceptionally(ex -> { - if (ex.getCause() instanceof PulsarAdminException.NotFoundException) { - // The first partition doesn't exist, so there are currently to subscriptions to recreate - result.complete(null); - } else { - log.warn("[{}] Failed to get list of subscriptions of {}", - clientAppId(), topicName.getPartition(0), ex); - result.completeExceptionally(ex); - } + log.warn("[{}] Failed to create subscriptions on new partitions for {}", + clientAppId(), topicName, ex); + result.completeExceptionally(ex); return null; }); }).exceptionally(ex -> { - log.warn("[{}] Failed to get partition metadata for {}", - clientAppId(), topicName.toString()); - result.completeExceptionally(ex); + if (ex.getCause() instanceof PulsarAdminException.NotFoundException) { + // The first partition doesn't exist, so there are currently to subscriptions to recreate + result.complete(null); + } else { + log.warn("[{}] Failed to get list of subscriptions of {}", + clientAppId(), topicName.getPartition(0), ex); + result.completeExceptionally(ex); + } return null; }); return result; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java index f246b00419d9b..3fe2457c7728f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java @@ -53,6 +53,8 @@ import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; +import org.apache.pulsar.common.policies.data.PolicyName; +import org.apache.pulsar.common.policies.data.PolicyOperation; import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; @@ -220,35 +222,48 @@ public void createNonPartitionedTopic( * It updates number of partitions of an existing non-global partitioned topic. It requires partitioned-topic to be * already exist and number of new partitions must be greater than existing number of partitions. Decrementing * number of partitions requires deletion of topic which is not supported. - * - * Already created partitioned producers and consumers can't see newly created partitions and it requires to - * recreate them at application so, newly created producers and consumers can connect to newly added partitions as - * well. Therefore, it can violate partition ordering at producers until all producers are restarted at application. - * - * @param property - * @param cluster - * @param namespace - * @param numPartitions */ @POST @Path("/{property}/{cluster}/{namespace}/{topic}/partitions") @ApiOperation(hidden = true, value = "Increment partitons of an existing partitioned topic.", notes = "It only increments partitions of existing non-global partitioned-topic") @ApiResponses(value = { + @ApiResponse(code = 204, message = "Update topic partition successful."), @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), - @ApiResponse(code = 403, message = "Don't have admin permission"), - @ApiResponse(code = 406, message = "The number of partitions should be more than 0" - + " and less than or equal to maxNumPartitionsPerPartitionedTopic"), - @ApiResponse(code = 409, message = "Partitioned topic does not exist")}) - public void updatePartitionedTopic(@PathParam("property") String property, @PathParam("cluster") String cluster, + @ApiResponse(code = 401, message = "Unauthenticated"), + @ApiResponse(code = 403, message = "Forbidden/Unauthorized"), + @ApiResponse(code = 404, message = "Topic does not exist"), + @ApiResponse(code = 422, message = "The number of partitions should be more than 0 and" + + " less than or equal to maxNumPartitionsPerPartitionedTopic" + + " and number of new partitions must be greater than existing number of partitions"), + @ApiResponse(code = 412, message = "Partitioned topic name is invalid"), + @ApiResponse(code = 500, message = "Internal server error") + }) + public void updatePartitionedTopic( + @Suspended final AsyncResponse asyncResponse, + @PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, - @QueryParam("updateLocalTopicOnly") @DefaultValue("false") boolean updateLocalTopicOnly, + @QueryParam("updateLocalTopicOnly") @DefaultValue("false") boolean updateLocalTopic, @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, @QueryParam("force") @DefaultValue("false") boolean force, int numPartitions) { validateTopicName(property, cluster, namespace, encodedTopic); - internalUpdatePartitionedTopic(numPartitions, updateLocalTopicOnly, authoritative, force); + validateTopicOwnership(topicName, authoritative); + validateTopicPolicyOperation(topicName, PolicyName.PARTITION, PolicyOperation.WRITE); + internalUpdatePartitionedTopicAsync(numPartitions, updateLocalTopic, force) + .thenAccept(__ -> { + log.info("[{}][{}] Updated topic partition to {}.", clientAppId(), topicName, numPartitions); + asyncResponse.resume(Response.noContent().build()); + }) + .exceptionally(ex -> { + if (!isRedirectException(ex)) { + log.error("[{}][{}] Failed to update partition to {}", + clientAppId(), topicName, numPartitions, ex); + } + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @GET diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index fceefa155fe3d..3dfc28b7504fa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -784,40 +784,32 @@ public void deleteDelayedDeliveryPolicies(@Suspended final AsyncResponse asyncRe * It updates number of partitions of an existing non-global partitioned topic. It requires partitioned-topic to be * already exist and number of new partitions must be greater than existing number of partitions. Decrementing * number of partitions requires deletion of topic which is not supported. - * - * Already created partitioned producers and consumers can't see newly created partitions and it requires to - * recreate them at application so, newly created producers and consumers can connect to newly added partitions as - * well. Therefore, it can violate partition ordering at producers until all producers are restarted at application. - * - * @param tenant - * @param namespace - * @param encodedTopic - * @param numPartitions */ @POST @Path("/{tenant}/{namespace}/{topic}/partitions") @ApiOperation(value = "Increment partitions of an existing partitioned topic.", notes = "It only increments partitions of existing non-global partitioned-topic") @ApiResponses(value = { + @ApiResponse(code = 204, message = "Update topic partition successful."), @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), - @ApiResponse(code = 401, - message = "Don't have permission to administrate resources on this tenant"), - @ApiResponse(code = 403, message = "Don't have admin permission"), - @ApiResponse(code = 404, message = "Tenant does not exist"), - @ApiResponse(code = 406, message = "The number of partitions should be more than 0 and" - + " less than or equal to maxNumPartitionsPerPartitionedTopic"), - @ApiResponse(code = 409, message = "Partitioned topic does not exist"), + @ApiResponse(code = 401, message = "Unauthenticated"), + @ApiResponse(code = 403, message = "Forbidden/Unauthorized"), + @ApiResponse(code = 404, message = "Topic does not exist"), + @ApiResponse(code = 422, message = "The number of partitions should be more than 0 and" + + " less than or equal to maxNumPartitionsPerPartitionedTopic" + + " and number of new partitions must be greater than existing number of partitions"), @ApiResponse(code = 412, message = "Partitioned topic name is invalid"), @ApiResponse(code = 500, message = "Internal server error") }) public void updatePartitionedTopic( + @Suspended final AsyncResponse asyncResponse, @ApiParam(value = "Specify the tenant", required = true) @PathParam("tenant") String tenant, @ApiParam(value = "Specify the namespace", required = true) @PathParam("namespace") String namespace, @ApiParam(value = "Specify topic name", required = true) @PathParam("topic") @Encoded String encodedTopic, - @QueryParam("updateLocalTopicOnly") @DefaultValue("false") boolean updateLocalTopicOnly, + @QueryParam("updateLocalTopicOnly") @DefaultValue("false") boolean updateLocalTopic, @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, @QueryParam("force") @DefaultValue("false") boolean force, @@ -825,8 +817,21 @@ public void updatePartitionedTopic( required = true, type = "int", defaultValue = "0") int numPartitions) { validatePartitionedTopicName(tenant, namespace, encodedTopic); - validatePartitionedTopicMetadata(); - internalUpdatePartitionedTopic(numPartitions, updateLocalTopicOnly, authoritative, force); + validateTopicOwnership(topicName, authoritative); + validateTopicPolicyOperation(topicName, PolicyName.PARTITION, PolicyOperation.WRITE); + internalUpdatePartitionedTopicAsync(numPartitions, updateLocalTopic, force) + .thenAccept(__ -> { + log.info("[{}][{}] Updated topic partition to {}.", clientAppId(), topicName, numPartitions); + asyncResponse.resume(Response.noContent().build()); + }) + .exceptionally(ex -> { + if (!isRedirectException(ex)) { + log.error("[{}][{}] Failed to update partition to {}", + clientAppId(), topicName, numPartitions, ex); + } + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java index cdca5f33a5bf6..9a45580ba1b4e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java @@ -914,8 +914,13 @@ public void testUpdatePartitionedTopicCoontainedInOldTopic() throws Exception { verify(response2, timeout(5000).times(1)).resume(responseCaptor.capture()); Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); - persistentTopics.updatePartitionedTopic(property, cluster, namespace, partitionedTopicName2, false, false, + AsyncResponse response3 = mock(AsyncResponse.class); + responseCaptor = ArgumentCaptor.forClass(Response.class); + persistentTopics.updatePartitionedTopic(response3, property, cluster, namespace, partitionedTopicName2, false, + false, false, 10); + verify(response3, timeout(5000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Status.NO_CONTENT.getStatusCode()); } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index afc7c86d6fecf..9cf1ed2a6f9be 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -110,6 +110,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest { private final String testTenant = "my-tenant"; private final String testLocalCluster = "use"; private final String testNamespace = "my-namespace"; + private final String testNamespaceLocal = "my-namespace-local"; protected Field uriField; protected UriInfo uriInfo; private NonPersistentTopics nonPersistentTopic; @@ -157,6 +158,7 @@ protected void setup() throws Exception { admin.tenants().createTenant(this.testTenant, new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet(testLocalCluster, "test"))); admin.namespaces().createNamespace(testTenant + "/" + testNamespace, Sets.newHashSet(testLocalCluster, "test")); + admin.namespaces().createNamespace(testTenant + "/" + testNamespaceLocal); } @Override @@ -555,7 +557,7 @@ public void testCreatePartitionedTopicHavingNonPartitionTopicWithPartitionSuffix Assert.assertEquals(errCaptor.getValue().getResponse().getStatus(), Response.Status.CONFLICT.getStatusCode()); } - @Test(expectedExceptions = RestException.class) + @Test public void testUpdatePartitionedTopicHavingNonPartitionTopicWithPartitionSuffix() throws Exception { // Already have non partition topic special-topic-partition-10, shouldn't able to update number of partitioned topic to more than 10. final String nonPartitionTopicName2 = "special-topic-partition-10"; @@ -573,8 +575,14 @@ public void testUpdatePartitionedTopicHavingNonPartitionTopicWithPartitionSuffix persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, 5, true); verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); - persistentTopics.updatePartitionedTopic(testTenant, testNamespace, partitionedTopicName, false, false, false, + + response = mock(AsyncResponse.class); + ArgumentCaptor errorCaptor = ArgumentCaptor.forClass(RestException.class); + persistentTopics.updatePartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, false, false, + false, 10); + verify(response, timeout(5000).times(1)).resume(errorCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); } @Test(timeOut = 10_000) @@ -1286,6 +1294,62 @@ public void testResetCursorReturnTimeoutWhenZKTimeout() { } } + @Test + public void testUpdatePartitionedTopic() + throws KeeperException, InterruptedException, PulsarAdminException { + String topicName = "testUpdatePartitionedTopic"; + String groupName = "cg_testUpdatePartitionedTopic"; + AsyncResponse response = mock(AsyncResponse.class); + ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class); + persistentTopics.createPartitionedTopic(response, testTenant, testNamespaceLocal, topicName, 2, true); + verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); + + response = mock(AsyncResponse.class); + persistentTopics.createSubscription(response, testTenant, testNamespaceLocal, topicName, groupName, true, + new ResetCursorData(MessageId.latest), false); + responseCaptor = ArgumentCaptor.forClass(Response.class); + verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); + + response = mock(AsyncResponse.class); + ArgumentCaptor metaCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class); + persistentTopics.getPartitionedMetadata(response, testTenant, testNamespaceLocal, topicName, true, false); + verify(response, timeout(5000).times(1)).resume(metaCaptor.capture()); + PartitionedTopicMetadata partitionedTopicMetadata = metaCaptor.getValue(); + Assert.assertEquals(partitionedTopicMetadata.partitions, 2); + + doNothing().when(persistentTopics).validatePartitionedTopicName(any(), any(), any()); + response = mock(AsyncResponse.class); + responseCaptor = ArgumentCaptor.forClass(Response.class); + persistentTopics.updatePartitionedTopic(response, testTenant, testNamespaceLocal, topicName, false, true, + true, 4); + verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + + response = mock(AsyncResponse.class); + metaCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class); + persistentTopics.getPartitionedMetadata(response, testTenant, testNamespaceLocal, topicName, true, false); + verify(response, timeout(5000).times(1)).resume(metaCaptor.capture()); + partitionedTopicMetadata = metaCaptor.getValue(); + Assert.assertEquals(partitionedTopicMetadata.partitions, 4); + + // check number of new partitions must be greater than existing number of partitions + response = mock(AsyncResponse.class); + ArgumentCaptor throwableCaptor = ArgumentCaptor.forClass(Throwable.class); + persistentTopics.updatePartitionedTopic(response, testTenant, testNamespaceLocal, topicName, false, true, + true, 3); + verify(response, timeout(5000).times(1)).resume(throwableCaptor.capture()); + Assert.assertEquals(throwableCaptor.getValue().getMessage(), + "Expect partitions 3 can't less than current partitions 4."); + + response = mock(AsyncResponse.class); + metaCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class); + persistentTopics.getPartitionedMetadata(response, testTenant, testNamespaceLocal, topicName, true, false); + verify(response, timeout(5000).times(1)).resume(metaCaptor.capture()); + partitionedTopicMetadata = metaCaptor.getValue(); + Assert.assertEquals(partitionedTopicMetadata.partitions, 4); + } + @Test public void testInternalGetReplicatedSubscriptionStatusFromLocal() throws Exception { String topicName = "persistent://" + testTenant + "/" + testNamespace