Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat][broker] Update and remove topic properties #4

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,7 @@ public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topicName,
case COMPACT:
case OFFLOAD:
case UNLOAD:
case DELETE_METADATA:
case ADD_BUNDLE_RANGE:
case GET_BUNDLE_RANGE:
case DELETE_BUNDLE_RANGE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerOfflineBacklog;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
Expand Down Expand Up @@ -654,6 +655,113 @@ private CompletableFuture<Map<String, String>> getPropertiesAsync() {
});
}

protected CompletableFuture<Void> internalUpdatePropertiesAsync(boolean authoritative,
Map<String, String> properties) {
if (properties == null || properties.isEmpty()) {
log.warn("[{}] [{}] properties is empty, ignore update", clientAppId(), topicName);
return CompletableFuture.completedFuture(null);
}
return validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.PRODUCE))
.thenCompose(__ -> {
if (topicName.isPartitioned()) {
return internalUpdateNonPartitionedTopicProperties(properties);
} else {
return pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName)
.thenCompose(metadata -> {
if (metadata.partitions == 0) {
return internalUpdateNonPartitionedTopicProperties(properties);
}
return namespaceResources()
.getPartitionedTopicResources().updatePartitionedTopicAsync(topicName,
p -> new PartitionedTopicMetadata(p.partitions,
p.properties == null ? properties
: MapUtils.putAll(p.properties, properties.entrySet().toArray())));
});
}
}).thenAccept(__ ->
log.info("[{}] [{}] update properties success with properties {}",
clientAppId(), topicName, properties));
}

private CompletableFuture<Void> internalUpdateNonPartitionedTopicProperties(Map<String, String> properties) {
CompletableFuture<Void> future = new CompletableFuture<>();
pulsar().getBrokerService().getTopicIfExists(topicName.toString())
.thenAccept(opt -> {
if (!opt.isPresent()) {
throw new RestException(Status.NOT_FOUND,
getTopicNotFoundErrorMessage(topicName.toString()));
}
ManagedLedger managedLedger = ((PersistentTopic) opt.get()).getManagedLedger();
managedLedger.asyncSetProperties(properties, new AsyncCallbacks.UpdatePropertiesCallback() {

@Override
public void updatePropertiesComplete(Map<String, String> properties, Object ctx) {
managedLedger.getConfig().getProperties().putAll(properties);
future.complete(null);
}

@Override
public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) {
future.completeExceptionally(exception);
}
}, null);
});
return future;
}

protected CompletableFuture<Void> internalRemovePropertiesAsync(boolean authoritative, String key) {
return validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.DELETE_METADATA))
.thenCompose(__ -> {
if (topicName.isPartitioned()) {
return internalRemoveNonPartitionedTopicProperties(key);
} else {
return pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName)
.thenCompose(metadata -> {
if (metadata.partitions == 0) {
return internalRemoveNonPartitionedTopicProperties(key);
}
return namespaceResources()
.getPartitionedTopicResources().updatePartitionedTopicAsync(topicName,
p -> {
if (p.properties != null) {
p.properties.remove(key);
}
return new PartitionedTopicMetadata(p.partitions, p.properties);
});
});
}
}).thenAccept(__ ->
log.info("[{}] remove [{}] properties success with key {}",
clientAppId(), topicName, key));
}

private CompletableFuture<Void> internalRemoveNonPartitionedTopicProperties(String key) {
CompletableFuture<Void> future = new CompletableFuture<>();
pulsar().getBrokerService().getTopicIfExists(topicName.toString())
.thenAccept(opt -> {
if (!opt.isPresent()) {
throw new RestException(Status.NOT_FOUND,
getTopicNotFoundErrorMessage(topicName.toString()));
}
ManagedLedger managedLedger = ((PersistentTopic) opt.get()).getManagedLedger();
managedLedger.asyncDeleteProperty(key, new AsyncCallbacks.UpdatePropertiesCallback() {

@Override
public void updatePropertiesComplete(Map<String, String> properties, Object ctx) {
future.complete(null);
}

@Override
public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) {
future.completeExceptionally(exception);
}
}, null);
});
return future;
}

protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, boolean authoritative,
boolean force, boolean deleteSchema) {
validateTopicOwnershipAsync(topicName, authoritative)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,77 @@ public void getProperties(
});
}

@PUT
@Path("/{tenant}/{namespace}/{topic}/properties")
@ApiOperation(value = "Update the properties on the given topic.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or"
+ "subscriber is not authorized to access this operation"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic/Subscription does not exist"),
@ApiResponse(code = 405, message = "Method Not Allowed"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration")
})
public void updateProperties(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "Key value pair properties for the topic metadata") Map<String, String> properties){
validatePersistentTopicName(tenant, namespace, encodedTopic);
internalUpdatePropertiesAsync(authoritative, properties)
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
log.error("[{}] Failed to update topic {} properties", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@DELETE
@Path("/{tenant}/{namespace}/{topic}/properties")
@ApiOperation(value = "Remove the key in properties on the given topic.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Partitioned topic does not exist"),
@ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 412, message = "Partitioned topic name is invalid"),
@ApiResponse(code = 500, message = "Internal server error")
})
public void removeProperties(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("key") String key,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validatePersistentTopicName(tenant, namespace, encodedTopic);
internalRemovePropertiesAsync(authoritative, key)
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
log.error("[{}] Failed to remove key {} in properties on topic {}",
clientAppId(), key, topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@DELETE
@Path("/{tenant}/{namespace}/{topic}/partitions")
@ApiOperation(value = "Delete a partitioned topic.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -902,6 +902,99 @@ public void testCreateAndGetTopicProperties() throws Exception {
Assert.assertEquals(properties22.get("key2"), "value2");
}

@Test
public void testUpdatePartitionedTopicProperties() throws Exception {
final String namespace = "prop-xyz/ns2";
final String topicName = "persistent://" + namespace + "/testUpdatePartitionedTopicProperties";
final String topicNameTwo = "persistent://" + namespace + "/testUpdatePartitionedTopicProperties2";
admin.namespaces().createNamespace(namespace, 20);

// create partitioned topic without properties
admin.topics().createPartitionedTopic(topicName, 2);
Map<String, String> properties = admin.topics().getProperties(topicName);
Assert.assertNull(properties);
Map<String, String> topicProperties = new HashMap<>();
topicProperties.put("key1", "value1");
admin.topics().updateProperties(topicName, topicProperties);
properties = admin.topics().getProperties(topicName);
Assert.assertNotNull(properties);
Assert.assertEquals(properties.get("key1"), "value1");

// update with new key, old properties should keep
topicProperties = new HashMap<>();
topicProperties.put("key2", "value2");
admin.topics().updateProperties(topicName, topicProperties);
properties = admin.topics().getProperties(topicName);
Assert.assertNotNull(properties);
Assert.assertEquals(properties.size(), 2);
Assert.assertEquals(properties.get("key1"), "value1");
Assert.assertEquals(properties.get("key2"), "value2");

// override old values
topicProperties = new HashMap<>();
topicProperties.put("key1", "value11");
admin.topics().updateProperties(topicName, topicProperties);
properties = admin.topics().getProperties(topicName);
Assert.assertNotNull(properties);
Assert.assertEquals(properties.size(), 2);
Assert.assertEquals(properties.get("key1"), "value11");
Assert.assertEquals(properties.get("key2"), "value2");

// create topic without properties
admin.topics().createPartitionedTopic(topicNameTwo, 2);
properties = admin.topics().getProperties(topicNameTwo);
Assert.assertNull(properties);
// remove key of properties on this topic
admin.topics().removeProperties(topicNameTwo, "key1");
properties = admin.topics().getProperties(topicNameTwo);
Assert.assertNull(properties);
Map<String, String> topicProp = new HashMap<>();
topicProp.put("key1", "value1");
topicProp.put("key2", "value2");
admin.topics().updateProperties(topicNameTwo, topicProp);
properties = admin.topics().getProperties(topicNameTwo);
Assert.assertEquals(properties, topicProp);
admin.topics().removeProperties(topicNameTwo, "key1");
topicProp.remove("key1");
properties = admin.topics().getProperties(topicNameTwo);
Assert.assertEquals(properties, topicProp);
}

@Test
public void testUpdateNonPartitionedTopicProperties() throws Exception {
final String namespace = "prop-xyz/ns2";
final String topicName = "persistent://" + namespace + "/testUpdateNonPartitionedTopicProperties";
admin.namespaces().createNamespace(namespace, 20);

// create non-partitioned topic with properties
Map<String, String> topicProperties = new HashMap<>();
topicProperties.put("key1", "value1");
admin.topics().createNonPartitionedTopic(topicName, topicProperties);
Map<String, String> properties = admin.topics().getProperties(topicName);
Assert.assertNotNull(properties);
Assert.assertEquals(properties.get("key1"), "value1");

// update with new key, old properties should keep
topicProperties = new HashMap<>();
topicProperties.put("key2", "value2");
admin.topics().updateProperties(topicName, topicProperties);
properties = admin.topics().getProperties(topicName);
Assert.assertNotNull(properties);
Assert.assertEquals(properties.size(), 2);
Assert.assertEquals(properties.get("key1"), "value1");
Assert.assertEquals(properties.get("key2"), "value2");

// override old values
topicProperties = new HashMap<>();
topicProperties.put("key1", "value11");
admin.topics().updateProperties(topicName, topicProperties);
properties = admin.topics().getProperties(topicName);
Assert.assertNotNull(properties);
Assert.assertEquals(properties.size(), 2);
Assert.assertEquals(properties.get("key1"), "value11");
Assert.assertEquals(properties.get("key2"), "value2");
}

@Test
public void testNonPersistentTopics() throws Exception {
final String namespace = "prop-xyz/ns2";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,42 @@ void updatePartitionedTopic(String topic, int numPartitions, boolean updateLocal
*/
CompletableFuture<Map<String, String>> getPropertiesAsync(String topic);

/**
* Update Topic Properties on a topic.
* The new properties will override the existing values, old properties in the topic will be keep if not override.
* @param topic
* @param properties
* @throws PulsarAdminException
*/
void updateProperties(String topic, Map<String, String> properties) throws PulsarAdminException;

/**
* Update Topic Properties on a topic.
* The new properties will override the existing values, old properties in the topic will be keep if not override.
* @param topic
* @param properties
* @return
*/
CompletableFuture<Void> updatePropertiesAsync(String topic, Map<String, String> properties);

/**
* Remove the key in properties on a topic.
*
* @param topic
* @param key
* @throws PulsarAdminException
*/
void removeProperties(String topic, String key) throws PulsarAdminException;

/**
* Remove the key in properties on a topic asynchronously.
*
* @param topic
* @param key
* @return
*/
CompletableFuture<Void> removePropertiesAsync(String topic, String key);

/**
* Delete a partitioned topic.
* <p/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,11 +474,39 @@ public void failed(Throwable throwable) {
return future;
}

@Override
public void updateProperties(String topic, Map<String, String> properties) throws PulsarAdminException {
sync(() -> updatePropertiesAsync(topic, properties));
}

@Override
public CompletableFuture<Void> updatePropertiesAsync(String topic, Map<String, String> properties) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "properties");
if (properties == null) {
properties = new HashMap<>();
}
return asyncPutRequest(path, Entity.entity(properties, MediaType.APPLICATION_JSON));
}

@Override
public void deletePartitionedTopic(String topic) throws PulsarAdminException {
deletePartitionedTopic(topic, false);
}

@Override
public void removeProperties(String topic, String key) throws PulsarAdminException {
sync(() -> removePropertiesAsync(topic, key));
}

@Override
public CompletableFuture<Void> removePropertiesAsync(String topic, String key) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "properties")
.queryParam("key", key);
return asyncDeleteRequest(path);
}

@Override
public CompletableFuture<Void> deletePartitionedTopicAsync(String topic) {
return deletePartitionedTopicAsync(topic, false);
Expand Down
Loading
Loading