From a33f1d1c72a785744f586ce2e92a45f25dd5ebd5 Mon Sep 17 00:00:00 2001 From: Zhouxiang Zhan Date: Thu, 29 Feb 2024 17:25:02 +0800 Subject: [PATCH] [ISSUE #7875] Add constructor for ProxyTopicRouteData (#7876) --- .../route/ClusterTopicRouteService.java | 19 +------ .../service/route/LocalTopicRouteService.java | 23 +------- .../service/route/ProxyTopicRouteData.java | 56 +++++++++++++++++++ 3 files changed, 60 insertions(+), 38 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteService.java index 84252f8b8e7..a4df98971cb 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteService.java @@ -17,11 +17,10 @@ package org.apache.rocketmq.proxy.service.route; import java.util.List; +import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.proxy.common.Address; -import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory; import org.apache.rocketmq.proxy.common.ProxyContext; -import org.apache.rocketmq.remoting.protocol.route.BrokerData; import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; public class ClusterTopicRouteService extends TopicRouteService { @@ -39,21 +38,7 @@ public MessageQueueView getCurrentMessageQueueView(ProxyContext ctx, String topi public ProxyTopicRouteData getTopicRouteForProxy(ProxyContext ctx, List
requestHostAndPortList, String topicName) throws Exception { TopicRouteData topicRouteData = getAllMessageQueueView(ctx, topicName).getTopicRouteData(); - - ProxyTopicRouteData proxyTopicRouteData = new ProxyTopicRouteData(); - proxyTopicRouteData.setQueueDatas(topicRouteData.getQueueDatas()); - - for (BrokerData brokerData : topicRouteData.getBrokerDatas()) { - ProxyTopicRouteData.ProxyBrokerData proxyBrokerData = new ProxyTopicRouteData.ProxyBrokerData(); - proxyBrokerData.setCluster(brokerData.getCluster()); - proxyBrokerData.setBrokerName(brokerData.getBrokerName()); - for (Long brokerId : brokerData.getBrokerAddrs().keySet()) { - proxyBrokerData.getBrokerAddrs().put(brokerId, requestHostAndPortList); - } - proxyTopicRouteData.getBrokerDatas().add(proxyBrokerData); - } - - return proxyTopicRouteData; + return new ProxyTopicRouteData(topicRouteData, requestHostAndPortList); } @Override diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteService.java index aced15cee51..f2a42c0aed9 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteService.java @@ -17,10 +17,10 @@ package org.apache.rocketmq.proxy.service.route; import com.google.common.collect.Lists; -import com.google.common.net.HostAndPort; import java.util.HashMap; import java.util.List; import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; @@ -28,7 +28,6 @@ import org.apache.rocketmq.proxy.common.Address; import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.config.ConfigurationManager; -import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory; import org.apache.rocketmq.remoting.protocol.route.BrokerData; import org.apache.rocketmq.remoting.protocol.route.QueueData; import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; @@ -62,25 +61,7 @@ public ProxyTopicRouteData getTopicRouteForProxy(ProxyContext ctx, List
String topicName) throws Exception { MessageQueueView messageQueueView = getAllMessageQueueView(ctx, topicName); TopicRouteData topicRouteData = messageQueueView.getTopicRouteData(); - - ProxyTopicRouteData proxyTopicRouteData = new ProxyTopicRouteData(); - proxyTopicRouteData.setQueueDatas(topicRouteData.getQueueDatas()); - - for (BrokerData brokerData : topicRouteData.getBrokerDatas()) { - ProxyTopicRouteData.ProxyBrokerData proxyBrokerData = new ProxyTopicRouteData.ProxyBrokerData(); - proxyBrokerData.setCluster(brokerData.getCluster()); - proxyBrokerData.setBrokerName(brokerData.getBrokerName()); - for (Long brokerId : brokerData.getBrokerAddrs().keySet()) { - String brokerAddr = brokerData.getBrokerAddrs().get(brokerId); - HostAndPort brokerHostAndPort = HostAndPort.fromString(brokerAddr); - HostAndPort grpcHostAndPort = HostAndPort.fromParts(brokerHostAndPort.getHost(), grpcPort); - - proxyBrokerData.getBrokerAddrs().put(brokerId, Lists.newArrayList(new Address(Address.AddressScheme.IPv4, grpcHostAndPort))); - } - proxyTopicRouteData.getBrokerDatas().add(proxyBrokerData); - } - - return proxyTopicRouteData; + return new ProxyTopicRouteData(topicRouteData, grpcPort); } @Override diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/ProxyTopicRouteData.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/ProxyTopicRouteData.java index da8b3f61127..63651f6fe81 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/ProxyTopicRouteData.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/ProxyTopicRouteData.java @@ -16,6 +16,8 @@ */ package org.apache.rocketmq.proxy.service.route; +import com.google.common.collect.Lists; +import com.google.common.net.HostAndPort; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -27,6 +29,60 @@ import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; public class ProxyTopicRouteData { + public ProxyTopicRouteData() { + } + + public ProxyTopicRouteData(TopicRouteData topicRouteData) { + this.queueDatas = topicRouteData.getQueueDatas(); + this.brokerDatas = new ArrayList<>(); + + for (BrokerData brokerData : topicRouteData.getBrokerDatas()) { + ProxyTopicRouteData.ProxyBrokerData proxyBrokerData = new ProxyTopicRouteData.ProxyBrokerData(); + proxyBrokerData.setCluster(brokerData.getCluster()); + proxyBrokerData.setBrokerName(brokerData.getBrokerName()); + for (Long brokerId : brokerData.getBrokerAddrs().keySet()) { + String brokerAddr = brokerData.getBrokerAddrs().get(brokerId); + HostAndPort hostAndPort = HostAndPort.fromString(brokerAddr); + + proxyBrokerData.getBrokerAddrs().put(brokerId, Lists.newArrayList(new Address(Address.AddressScheme.IPv4, hostAndPort))); + } + this.brokerDatas.add(proxyBrokerData); + } + } + + public ProxyTopicRouteData(TopicRouteData topicRouteData, int port) { + this.queueDatas = topicRouteData.getQueueDatas(); + this.brokerDatas = new ArrayList<>(); + + for (BrokerData brokerData : topicRouteData.getBrokerDatas()) { + ProxyTopicRouteData.ProxyBrokerData proxyBrokerData = new ProxyTopicRouteData.ProxyBrokerData(); + proxyBrokerData.setCluster(brokerData.getCluster()); + proxyBrokerData.setBrokerName(brokerData.getBrokerName()); + for (Long brokerId : brokerData.getBrokerAddrs().keySet()) { + String brokerAddr = brokerData.getBrokerAddrs().get(brokerId); + HostAndPort brokerHostAndPort = HostAndPort.fromString(brokerAddr); + HostAndPort hostAndPort = HostAndPort.fromParts(brokerHostAndPort.getHost(), port); + + proxyBrokerData.getBrokerAddrs().put(brokerId, Lists.newArrayList(new Address(Address.AddressScheme.IPv4, hostAndPort))); + } + this.brokerDatas.add(proxyBrokerData); + } + } + + public ProxyTopicRouteData(TopicRouteData topicRouteData, List
requestHostAndPortList) { + this.queueDatas = topicRouteData.getQueueDatas(); + this.brokerDatas = new ArrayList<>(); + + for (BrokerData brokerData : topicRouteData.getBrokerDatas()) { + ProxyTopicRouteData.ProxyBrokerData proxyBrokerData = new ProxyTopicRouteData.ProxyBrokerData(); + proxyBrokerData.setCluster(brokerData.getCluster()); + proxyBrokerData.setBrokerName(brokerData.getBrokerName()); + for (Long brokerId : brokerData.getBrokerAddrs().keySet()) { + proxyBrokerData.getBrokerAddrs().put(brokerId, requestHostAndPortList); + } + this.brokerDatas.add(proxyBrokerData); + } + } public static class ProxyBrokerData { private String cluster;