Skip to content

Commit

Permalink
[ISSUE apache#7875] Add constructor for ProxyTopicRouteData (apache#7876
Browse files Browse the repository at this point in the history
)
  • Loading branch information
drpmma authored Feb 29, 2024
1 parent eed303d commit a33f1d1
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@
package org.apache.rocketmq.proxy.service.route;

import java.util.List;
import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.proxy.common.Address;
import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;

public class ClusterTopicRouteService extends TopicRouteService {
Expand All @@ -39,21 +38,7 @@ public MessageQueueView getCurrentMessageQueueView(ProxyContext ctx, String topi
public ProxyTopicRouteData getTopicRouteForProxy(ProxyContext ctx, List<Address> requestHostAndPortList,
String topicName) throws Exception {
TopicRouteData topicRouteData = getAllMessageQueueView(ctx, topicName).getTopicRouteData();

ProxyTopicRouteData proxyTopicRouteData = new ProxyTopicRouteData();
proxyTopicRouteData.setQueueDatas(topicRouteData.getQueueDatas());

for (BrokerData brokerData : topicRouteData.getBrokerDatas()) {
ProxyTopicRouteData.ProxyBrokerData proxyBrokerData = new ProxyTopicRouteData.ProxyBrokerData();
proxyBrokerData.setCluster(brokerData.getCluster());
proxyBrokerData.setBrokerName(brokerData.getBrokerName());
for (Long brokerId : brokerData.getBrokerAddrs().keySet()) {
proxyBrokerData.getBrokerAddrs().put(brokerId, requestHostAndPortList);
}
proxyTopicRouteData.getBrokerDatas().add(proxyBrokerData);
}

return proxyTopicRouteData;
return new ProxyTopicRouteData(topicRouteData, requestHostAndPortList);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,17 @@
package org.apache.rocketmq.proxy.service.route;

import com.google.common.collect.Lists;
import com.google.common.net.HostAndPort;
import java.util.HashMap;
import java.util.List;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.proxy.common.Address;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.remoting.protocol.route.QueueData;
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
Expand Down Expand Up @@ -62,25 +61,7 @@ public ProxyTopicRouteData getTopicRouteForProxy(ProxyContext ctx, List<Address>
String topicName) throws Exception {
MessageQueueView messageQueueView = getAllMessageQueueView(ctx, topicName);
TopicRouteData topicRouteData = messageQueueView.getTopicRouteData();

ProxyTopicRouteData proxyTopicRouteData = new ProxyTopicRouteData();
proxyTopicRouteData.setQueueDatas(topicRouteData.getQueueDatas());

for (BrokerData brokerData : topicRouteData.getBrokerDatas()) {
ProxyTopicRouteData.ProxyBrokerData proxyBrokerData = new ProxyTopicRouteData.ProxyBrokerData();
proxyBrokerData.setCluster(brokerData.getCluster());
proxyBrokerData.setBrokerName(brokerData.getBrokerName());
for (Long brokerId : brokerData.getBrokerAddrs().keySet()) {
String brokerAddr = brokerData.getBrokerAddrs().get(brokerId);
HostAndPort brokerHostAndPort = HostAndPort.fromString(brokerAddr);
HostAndPort grpcHostAndPort = HostAndPort.fromParts(brokerHostAndPort.getHost(), grpcPort);

proxyBrokerData.getBrokerAddrs().put(brokerId, Lists.newArrayList(new Address(Address.AddressScheme.IPv4, grpcHostAndPort)));
}
proxyTopicRouteData.getBrokerDatas().add(proxyBrokerData);
}

return proxyTopicRouteData;
return new ProxyTopicRouteData(topicRouteData, grpcPort);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.rocketmq.proxy.service.route;

import com.google.common.collect.Lists;
import com.google.common.net.HostAndPort;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -27,6 +29,60 @@
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;

public class ProxyTopicRouteData {
public ProxyTopicRouteData() {
}

public ProxyTopicRouteData(TopicRouteData topicRouteData) {
this.queueDatas = topicRouteData.getQueueDatas();
this.brokerDatas = new ArrayList<>();

for (BrokerData brokerData : topicRouteData.getBrokerDatas()) {
ProxyTopicRouteData.ProxyBrokerData proxyBrokerData = new ProxyTopicRouteData.ProxyBrokerData();
proxyBrokerData.setCluster(brokerData.getCluster());
proxyBrokerData.setBrokerName(brokerData.getBrokerName());
for (Long brokerId : brokerData.getBrokerAddrs().keySet()) {
String brokerAddr = brokerData.getBrokerAddrs().get(brokerId);
HostAndPort hostAndPort = HostAndPort.fromString(brokerAddr);

proxyBrokerData.getBrokerAddrs().put(brokerId, Lists.newArrayList(new Address(Address.AddressScheme.IPv4, hostAndPort)));
}
this.brokerDatas.add(proxyBrokerData);
}
}

public ProxyTopicRouteData(TopicRouteData topicRouteData, int port) {
this.queueDatas = topicRouteData.getQueueDatas();
this.brokerDatas = new ArrayList<>();

for (BrokerData brokerData : topicRouteData.getBrokerDatas()) {
ProxyTopicRouteData.ProxyBrokerData proxyBrokerData = new ProxyTopicRouteData.ProxyBrokerData();
proxyBrokerData.setCluster(brokerData.getCluster());
proxyBrokerData.setBrokerName(brokerData.getBrokerName());
for (Long brokerId : brokerData.getBrokerAddrs().keySet()) {
String brokerAddr = brokerData.getBrokerAddrs().get(brokerId);
HostAndPort brokerHostAndPort = HostAndPort.fromString(brokerAddr);
HostAndPort hostAndPort = HostAndPort.fromParts(brokerHostAndPort.getHost(), port);

proxyBrokerData.getBrokerAddrs().put(brokerId, Lists.newArrayList(new Address(Address.AddressScheme.IPv4, hostAndPort)));
}
this.brokerDatas.add(proxyBrokerData);
}
}

public ProxyTopicRouteData(TopicRouteData topicRouteData, List<Address> requestHostAndPortList) {
this.queueDatas = topicRouteData.getQueueDatas();
this.brokerDatas = new ArrayList<>();

for (BrokerData brokerData : topicRouteData.getBrokerDatas()) {
ProxyTopicRouteData.ProxyBrokerData proxyBrokerData = new ProxyTopicRouteData.ProxyBrokerData();
proxyBrokerData.setCluster(brokerData.getCluster());
proxyBrokerData.setBrokerName(brokerData.getBrokerName());
for (Long brokerId : brokerData.getBrokerAddrs().keySet()) {
proxyBrokerData.getBrokerAddrs().put(brokerId, requestHostAndPortList);
}
this.brokerDatas.add(proxyBrokerData);
}
}

public static class ProxyBrokerData {
private String cluster;
Expand Down

0 comments on commit a33f1d1

Please sign in to comment.