diff --git a/cli/src/main/java/com/automq/rocketmq/cli/ConsoleHelper.java b/cli/src/main/java/com/automq/rocketmq/cli/ConsoleHelper.java new file mode 100644 index 000000000..511331730 --- /dev/null +++ b/cli/src/main/java/com/automq/rocketmq/cli/ConsoleHelper.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.automq.rocketmq.cli; + +import apache.rocketmq.controller.v1.MessageQueueAssignment; +import apache.rocketmq.controller.v1.OngoingMessageQueueReassignment; +import apache.rocketmq.controller.v1.Topic; +import de.vandermeer.asciitable.AT_Row; +import de.vandermeer.asciitable.AsciiTable; +import de.vandermeer.skb.interfaces.transformers.textformat.TextAlignment; +import java.util.List; + +public class ConsoleHelper { + public static void printTable(Topic topic) { + AsciiTable topicTable = new AsciiTable(); + topicTable.addRule(); + topicTable.addRow("TOPIC ID", "TOPIC NAME"); + topicTable.addRule(); + topicTable.addRow(topic.getTopicId(), topic.getName()); + topicTable.addRule(); + String render = topicTable.render(); + System.out.println(render); + + AsciiTable assignmentTable = new AsciiTable(); + assignmentTable.addRule(); + AT_Row row = assignmentTable.addRow(null, "ASSIGNMENT"); + row.getCells().get(1).getContext().setTextAlignment(TextAlignment.CENTER); + assignmentTable.addRule(); + assignmentTable.addRow("NODE ID", "QUEUE ID"); + assignmentTable.addRule(); + for (MessageQueueAssignment assignment : topic.getAssignmentsList()) { + assignmentTable.addRow(assignment.getNodeId(), assignment.getQueue().getQueueId()); + assignmentTable.addRule(); + } + render = assignmentTable.render(); + System.out.println(render); + + List ongoing = topic.getReassignmentsList(); + if (!ongoing.isEmpty()) { + AsciiTable reassignmentTable = new AsciiTable(); + assignmentTable.addRule(); + row = assignmentTable.addRow(null, "ON-GOING REASSIGNMENT"); + row.getCells().get(1).getContext().setTextAlignment(TextAlignment.CENTER); + reassignmentTable.addRule(); + reassignmentTable.addRow("SRC NODE ID", "DST NODE ID", "QUEUE ID"); + reassignmentTable.addRule(); + for (OngoingMessageQueueReassignment reassignment : ongoing) { + reassignmentTable.addRow(reassignment.getSrcNodeId(), reassignment.getDstNodeId(), reassignment.getQueue().getQueueId()); + reassignmentTable.addRule(); + } + } + } +} diff --git a/cli/src/main/java/com/automq/rocketmq/cli/DescribeTopic.java b/cli/src/main/java/com/automq/rocketmq/cli/DescribeTopic.java index 00737fd51..43748600c 100644 --- a/cli/src/main/java/com/automq/rocketmq/cli/DescribeTopic.java +++ b/cli/src/main/java/com/automq/rocketmq/cli/DescribeTopic.java @@ -17,14 +17,8 @@ package com.automq.rocketmq.cli; -import apache.rocketmq.controller.v1.MessageQueueAssignment; -import apache.rocketmq.controller.v1.OngoingMessageQueueReassignment; import apache.rocketmq.controller.v1.Topic; import com.automq.rocketmq.controller.metadata.GrpcControllerClient; -import de.vandermeer.asciitable.AT_Row; -import de.vandermeer.asciitable.AsciiTable; -import de.vandermeer.skb.interfaces.transformers.textformat.TextAlignment; -import java.util.List; import java.util.concurrent.Callable; import picocli.CommandLine; @@ -46,44 +40,7 @@ public Void call() throws Exception { System.err.printf("Topic '%s' is not found%n%n", topicName); return null; } - AsciiTable topicTable = new AsciiTable(); - topicTable.addRule(); - topicTable.addRow("TOPIC ID", "TOPIC NAME"); - topicTable.addRule(); - topicTable.addRow(topic.getTopicId(), topic.getName()); - topicTable.addRule(); - String render = topicTable.render(); - System.out.println(render); - - AsciiTable assignmentTable = new AsciiTable(); - assignmentTable.addRule(); - AT_Row row = assignmentTable.addRow(null, "ASSIGNMENT"); - row.getCells().get(1).getContext().setTextAlignment(TextAlignment.CENTER); - assignmentTable.addRule(); - assignmentTable.addRow("NODE ID", "QUEUE ID"); - assignmentTable.addRule(); - for (MessageQueueAssignment assignment : topic.getAssignmentsList()) { - assignmentTable.addRow(assignment.getNodeId(), assignment.getQueue().getQueueId()); - assignmentTable.addRule(); - } - render = assignmentTable.render(); - System.out.println(render); - - List ongoing = topic.getReassignmentsList(); - if (!ongoing.isEmpty()) { - AsciiTable reassignmentTable = new AsciiTable(); - assignmentTable.addRule(); - row = assignmentTable.addRow(null, "ON-GOING REASSIGNMENT"); - row.getCells().get(1).getContext().setTextAlignment(TextAlignment.CENTER); - reassignmentTable.addRule(); - reassignmentTable.addRow("SRC NODE ID", "DST NODE ID", "QUEUE ID"); - reassignmentTable.addRule(); - for (OngoingMessageQueueReassignment reassignment : ongoing) { - reassignmentTable.addRow(reassignment.getSrcNodeId(), reassignment.getDstNodeId(), reassignment.getQueue().getQueueId()); - reassignmentTable.addRule(); - } - } - + ConsoleHelper.printTable(topic); } return null; } diff --git a/cli/src/main/java/com/automq/rocketmq/cli/MQAdmin.java b/cli/src/main/java/com/automq/rocketmq/cli/MQAdmin.java index 6c8fc8fb8..f7918e0ba 100644 --- a/cli/src/main/java/com/automq/rocketmq/cli/MQAdmin.java +++ b/cli/src/main/java/com/automq/rocketmq/cli/MQAdmin.java @@ -27,6 +27,7 @@ subcommands = { CreateTopic.class, DescribeTopic.class, + UpdateTopic.class, CreateGroup.class, DescribeGroup.class, ProduceMessage.class, diff --git a/cli/src/main/java/com/automq/rocketmq/cli/UpdateTopic.java b/cli/src/main/java/com/automq/rocketmq/cli/UpdateTopic.java new file mode 100644 index 000000000..81032f7b3 --- /dev/null +++ b/cli/src/main/java/com/automq/rocketmq/cli/UpdateTopic.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.automq.rocketmq.cli; + +import apache.rocketmq.controller.v1.Topic; +import apache.rocketmq.controller.v1.UpdateTopicRequest; +import com.automq.rocketmq.controller.metadata.GrpcControllerClient; +import com.google.common.base.Strings; +import java.util.concurrent.Callable; +import picocli.CommandLine; + +@CommandLine.Command(name = "updateTopic", mixinStandardHelpOptions = true, showDefaultValues = true) +public class UpdateTopic implements Callable { + + @CommandLine.Option(names = {"-i", "--topicId"}, description = "Topic ID", required = true) + long topicId; + + @CommandLine.Option(names = {"-t", "--topicName"}, description = "Topic name") + String topicName; + + @CommandLine.Option(names = {"-q", "--queueNumber"}, description = "Queue number") + int queueNumber = 0; + + @CommandLine.ParentCommand + MQAdmin mqAdmin; + + @Override + public Void call() throws Exception { + try (GrpcControllerClient client = new GrpcControllerClient()) { + Topic topic = client.describeTopic(mqAdmin.endpoint, topicId, null) + .join(); + if (null == topic) { + System.err.printf("Topic '%s' is not found%n%n", topicName); + return null; + } + + UpdateTopicRequest.Builder builder = UpdateTopicRequest.newBuilder() + .setTopicId(topicId); + if (queueNumber > topic.getCount()) { + builder.setCount(queueNumber); + } + + if (!Strings.isNullOrEmpty(topicName)) { + builder.setName(topicName); + } + client.updateTopic(mqAdmin.endpoint, builder.build()).join(); + topic = client.describeTopic(mqAdmin.endpoint, topicId, null).join(); + ConsoleHelper.printTable(topic); + } + return null; + } +} diff --git a/controller/src/main/java/com/automq/rocketmq/controller/ControllerServiceImpl.java b/controller/src/main/java/com/automq/rocketmq/controller/ControllerServiceImpl.java index 62e49d1e2..4540aea3c 100644 --- a/controller/src/main/java/com/automq/rocketmq/controller/ControllerServiceImpl.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/ControllerServiceImpl.java @@ -263,6 +263,7 @@ public void updateTopic(UpdateTopicRequest request, StreamObserver 0) { queueNumber = request.getCount(); } + this.metadataStore.updateTopic(request.getTopicId(), request.getName(), queueNumber, request.getAcceptMessageTypesList()) .whenComplete((res, e) -> { diff --git a/controller/src/main/java/com/automq/rocketmq/controller/metadata/GrpcControllerClient.java b/controller/src/main/java/com/automq/rocketmq/controller/metadata/GrpcControllerClient.java index 87402c326..b10eb5f55 100644 --- a/controller/src/main/java/com/automq/rocketmq/controller/metadata/GrpcControllerClient.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/metadata/GrpcControllerClient.java @@ -238,17 +238,20 @@ public void onFailure(@Nonnull Throwable t) { } @Override - public CompletableFuture describeTopic(String target, Long topicId, - String topicName) { + public CompletableFuture describeTopic(String target, Long topicId, String topicName) { - DescribeTopicRequest request = DescribeTopicRequest.newBuilder() - .setTopicId(null == topicId ? -1 : topicId) - .setTopicName(topicName) - .build(); + DescribeTopicRequest.Builder builder = DescribeTopicRequest.newBuilder(); + if (null != topicId && topicId > 0) { + builder.setTopicId(topicId); + } + + if (!Strings.isNullOrEmpty(topicName)) { + builder.setTopicName(topicName.trim()); + } CompletableFuture future = new CompletableFuture<>(); try { - Futures.addCallback(buildStubForTarget(target).describeTopic(request), new FutureCallback<>() { + Futures.addCallback(buildStubForTarget(target).describeTopic(builder.build()), new FutureCallback<>() { @Override public void onSuccess(DescribeTopicReply result) { if (result.getStatus().getCode() == Code.OK) { diff --git a/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/TopicManager.java b/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/TopicManager.java index 462df984c..87b27540e 100644 --- a/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/TopicManager.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/TopicManager.java @@ -179,12 +179,18 @@ public CompletableFuture updateTopic(long t changed = true; } - // Update queue nums + // Update queue number if (null != queueNumber && !queueNumber.equals(topic.getQueueNum())) { if (queueNumber > topic.getQueueNum()) { - createQueues(IntStream.range(topic.getQueueNum(), queueNumber), topic.getId(), session); + IntStream range = IntStream.range(topic.getQueueNum(), queueNumber); + List assignments = createQueues(range, topic.getId(), session); + assignmentCache.apply(assignments); + topic.setQueueNum(queueNumber); + changed = true; + } else { + // Ignore queue number field if not enlarged + topic.setQueueNum(null); } - changed = true; } if (changed) { diff --git a/controller/src/main/resources/database/mapper/GroupMapper.xml b/controller/src/main/resources/database/mapper/GroupMapper.xml index 7f8dc8c80..740b56326 100644 --- a/controller/src/main/resources/database/mapper/GroupMapper.xml +++ b/controller/src/main/resources/database/mapper/GroupMapper.xml @@ -50,26 +50,27 @@ \ No newline at end of file diff --git a/controller/src/main/resources/database/mapper/TopicMapper.xml b/controller/src/main/resources/database/mapper/TopicMapper.xml index 5af4d2fa1..a24423f79 100644 --- a/controller/src/main/resources/database/mapper/TopicMapper.xml +++ b/controller/src/main/resources/database/mapper/TopicMapper.xml @@ -41,15 +41,10 @@ UPDATE topic - - status = #{status}, - - - queue_num = #{queueNum}, - - - accept_message_types = #{acceptMessageTypes}, - + status = #{status}, + name = #{name}, + queue_num = #{queueNum}, + accept_message_types = #{acceptMessageTypes}, WHERE id = #{id} @@ -57,10 +52,9 @@ DELETE FROM topic - WHERE 1 = 1 - - AND id = #{id} - + + id = #{id} + @@ -72,25 +66,18 @@ \ No newline at end of file