diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml
new file mode 100644
index 0000000..ee9f695
--- /dev/null
+++ b/.idea/inspectionProfiles/Project_Default.xml
@@ -0,0 +1,68 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/leetcode/editor.xml b/.idea/leetcode/editor.xml
new file mode 100644
index 0000000..2527884
--- /dev/null
+++ b/.idea/leetcode/editor.xml
@@ -0,0 +1,20 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/leetcode/statistics.xml b/.idea/leetcode/statistics.xml
new file mode 100644
index 0000000..307aec8
--- /dev/null
+++ b/.idea/leetcode/statistics.xml
@@ -0,0 +1,20 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/nowcoder/editor.xml b/.idea/nowcoder/editor.xml
new file mode 100644
index 0000000..0159367
--- /dev/null
+++ b/.idea/nowcoder/editor.xml
@@ -0,0 +1,21 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/broker/src/main/java/me/deve/streamq/broker/handler/MessageServerHandler.java b/broker/src/main/java/me/deve/streamq/broker/handler/MessageServerHandler.java
index 8038219..1e5f1b8 100644
--- a/broker/src/main/java/me/deve/streamq/broker/handler/MessageServerHandler.java
+++ b/broker/src/main/java/me/deve/streamq/broker/handler/MessageServerHandler.java
@@ -113,10 +113,6 @@ else if(messageType ==FunctionMessageType.PULL_MESSAGE){
}
- private FunctionMessage getFunctionMessage(byte[] array) {
- KryoSerializer kryoSerializer = new KryoSerializer();
- return kryoSerializer.deserialize(array, FunctionMessage.class);
- }
public byte[] getMessageBytes(Object msg){
ByteBuf byteBuf= (ByteBuf) msg;
diff --git a/client/src/main/java/me/deve/streamq/client/loadbalance/BrokerLoadBalance.java b/client/src/main/java/me/deve/streamq/client/loadbalance/BrokerLoadBalance.java
new file mode 100644
index 0000000..d8c45bd
--- /dev/null
+++ b/client/src/main/java/me/deve/streamq/client/loadbalance/BrokerLoadBalance.java
@@ -0,0 +1,103 @@
+//-*- coding =utf-8 -*-
+//@Time : 2023/6/29
+//@Author: 邓闽川
+//@File ClientUtil.java
+//@software:IntelliJ IDEA
+package me.deve.streamq.client.loadbalance;
+
+import cn.hutool.core.util.RandomUtil;
+import lombok.extern.slf4j.Slf4j;
+import me.deve.streamq.client.msgstrategy.BrokerChooseStrategy;
+import me.deve.streamq.client.producer.DefaultMQProducer;
+import me.deve.streamq.common.address.KryoInetAddress;
+import me.deve.streamq.common.component.Broker;
+import me.deve.streamq.common.message.Message;
+
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.*;
+
+
+@Slf4j
+public class BrokerLoadBalance {
+
+ private DefaultMQProducer producer;
+ /**
+ * RoundRobin offset
+ */
+ private int offset=0;
+
+ private ConsistentHashingWithVirtualNode consistentHashingWithVirtualNode;
+
+
+ public BrokerLoadBalance(){
+
+ }
+ public BrokerLoadBalance(DefaultMQProducer producer){
+ this.producer=producer;
+ }
+ public KryoInetAddress chooseBroker(BrokerChooseStrategy strategy, String topicName, Message message){
+ List brokers = producer.topicRouteData.get(topicName);
+ return
+ switch (strategy) {
+ case ROUND_ROBIN-> chooseBrokerByRoundRobin(brokers);
+ case RANDOM -> chooseBrokerByRandom(brokers);
+ case HASH -> chooseBrokerByHash(brokers);
+ case CONSISTENCY_HASH -> chooseBrokerByConsistencyHash(brokers,message);
+ };
+ }
+
+ /**
+ * 一致性hash算法,即使broker数量发生变化前面的映射也不会受到影响
+ * 1.节点数量发生变化时尽量保证相同线程相同tags消息发送到同一节点,单tag 单线程发送没必要使用此负载均衡方式
+ * 2.只有少量生产者会被重定向到不同broker节点,broker端压力不会突然改变
+ * 3.使负载均衡更加平滑
+ * 4.在分布式缓存中使用效果最好,扩容缩容时防止过多数据迁移和缓存雪崩
+ * 缺点:需要的消耗较大
+ * @param brokers
+ * @return
+ */
+ private KryoInetAddress chooseBrokerByConsistencyHash(List brokers,Message message) {
+ Broker broker = consistentHashingWithVirtualNode.getNode(message, brokers);
+ return broker.getInetAddress();
+ }
+
+ private KryoInetAddress chooseBrokerByHash(List brokers) {
+ try {
+ String hostAddress = InetAddress.getLocalHost().getHostAddress();
+ int hashCode = hash(hostAddress);
+ int size = brokers.size();
+ return brokers.get(hashCode%size).getInetAddress();
+ } catch (UnknownHostException e) {
+ log.error("获取本机ip错误");
+ }
+
+ return null;
+ }
+
+ private KryoInetAddress chooseBrokerByRandom(List brokers) {
+ if(brokers==null){
+ return null;
+ }
+ int index = RandomUtil.randomInt(0, brokers.size());
+ return brokers.get(index).getInetAddress();
+ }
+
+ /**
+ * 轮询
+ * @param brokers
+ * @return
+ */
+ private KryoInetAddress chooseBrokerByRoundRobin(List brokers) {
+ offset%=brokers.size();
+ return brokers.get(offset++).getInetAddress();
+
+ }
+ public static int hash(Object obj) {
+ return obj.hashCode()>>16^obj.hashCode();
+ }
+
+
+
+}
diff --git a/client/src/main/java/me/deve/streamq/client/loadbalance/ConsistentHashingWithVirtualNode.java b/client/src/main/java/me/deve/streamq/client/loadbalance/ConsistentHashingWithVirtualNode.java
new file mode 100644
index 0000000..8040138
--- /dev/null
+++ b/client/src/main/java/me/deve/streamq/client/loadbalance/ConsistentHashingWithVirtualNode.java
@@ -0,0 +1,63 @@
+//-*- coding =utf-8 -*-
+//@Time : 2023/8/26
+//@Author: 邓闽川
+//@File ConsistentHashingWithVirtualNode.java
+//@software:IntelliJ IDEA
+package me.deve.streamq.client.loadbalance;
+
+import me.deve.streamq.common.component.Broker;
+import me.deve.streamq.common.message.Message;
+
+import java.util.List;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+public class ConsistentHashingWithVirtualNode {
+ private List brokers;
+
+ private final SortedMap hashRing=new TreeMap<>();
+
+ private final int VIRTUAL_NODE_NUMBER=10;
+
+
+
+
+ public ConsistentHashingWithVirtualNode(List brokers){
+ this.brokers = brokers;
+ }
+
+ public void buildHashRing(List brokers){
+ for (Broker broker : brokers) {
+ for(int i=0;i brokers){
+ //hashRing发生了变化就重建
+ if(this.brokers==null||!this.brokers.equals(brokers)){
+ this.brokers=brokers;
+ buildHashRing(brokers);
+ }
+ //计算key
+ int key1 = BrokerLoadBalance.hash(message.getTags());
+ int key2=BrokerLoadBalance.hash(Thread.currentThread());
+ int key=key1^key2;
+ SortedMap subMap = hashRing.tailMap(key);
+ //说明是第一个
+ if(subMap.isEmpty()){
+ VirtualBroker virtualBroker = hashRing.get(hashRing.firstKey());
+ return virtualBroker.getBroker();
+ }else{
+ VirtualBroker virtualBroker = hashRing.get(subMap.firstKey());
+ return virtualBroker.getBroker();
+ }
+ }
+}
diff --git a/client/src/main/java/me/deve/streamq/client/loadbalance/VirtualBroker.java b/client/src/main/java/me/deve/streamq/client/loadbalance/VirtualBroker.java
new file mode 100644
index 0000000..07704ee
--- /dev/null
+++ b/client/src/main/java/me/deve/streamq/client/loadbalance/VirtualBroker.java
@@ -0,0 +1,21 @@
+//-*- coding =utf-8 -*-
+//@Time : 2023/8/26
+//@Author: 邓闽川
+//@File VitualBroker.java
+//@software:IntelliJ IDEA
+package me.deve.streamq.client.loadbalance;
+
+import lombok.Getter;
+import me.deve.streamq.common.component.Broker;
+
+public class VirtualBroker {
+ @Getter
+ private Broker broker;
+
+ private Integer virtual_id;
+
+ public VirtualBroker(Broker broker, int i) {
+ this.broker=broker;
+ this.virtual_id=i;
+ }
+}
diff --git a/client/src/main/java/me/deve/streamq/client/msgstrategy/BrokerChooseStrategy.java b/client/src/main/java/me/deve/streamq/client/msgstrategy/BrokerChooseStrategy.java
index b45bbf5..2570704 100644
--- a/client/src/main/java/me/deve/streamq/client/msgstrategy/BrokerChooseStrategy.java
+++ b/client/src/main/java/me/deve/streamq/client/msgstrategy/BrokerChooseStrategy.java
@@ -11,6 +11,7 @@
public enum BrokerChooseStrategy {
ROUND_ROBIN,
RANDOM,
- HASH
+ HASH,
+ CONSISTENCY_HASH
}
diff --git a/client/src/main/java/me/deve/streamq/client/producer/DefaultMQProducer.java b/client/src/main/java/me/deve/streamq/client/producer/DefaultMQProducer.java
index 2665fbb..2c23d8c 100644
--- a/client/src/main/java/me/deve/streamq/client/producer/DefaultMQProducer.java
+++ b/client/src/main/java/me/deve/streamq/client/producer/DefaultMQProducer.java
@@ -12,6 +12,7 @@
import lombok.extern.slf4j.Slf4j;
import me.deve.streamq.client.handler.ServerFindClientHandler;
import me.deve.streamq.client.handler.ProduceHandler;
+import me.deve.streamq.client.loadbalance.BrokerLoadBalance;
import me.deve.streamq.common.message.Message;
import me.deve.streamq.client.msgstrategy.BrokerChooseStrategy;
import me.deve.streamq.common.address.KryoInetAddress;
@@ -19,13 +20,12 @@
import me.deve.streamq.common.config.NettyClientConfig;
import me.deve.streamq.common.util.IdDistributor;
import me.deve.streamq.remoting.netty.NettyClient;
-import me.deve.streamq.remoting.thread.NettyClientThread;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.*;
-import static me.deve.streamq.client.util.ClientUtil.chooseBroker;
+
/**
* 默认生产者
@@ -53,7 +53,7 @@ public class DefaultMQProducer implements MQProducer{
private final ProduceHandler produceHandler=new ProduceHandler();
- public volatile HashMap> topicRouteData;
+ public volatile HashMap> topicRouteData;
private CountDownLatch latch=new CountDownLatch(1);
@@ -90,10 +90,11 @@ public void start() {
} catch (InterruptedException e) {
log.error("can not get topicRouteData");
}
- if(defaultTopic!=null){
- defaultBrokerAddress = chooseBroker(strategy,topicRouteData, defaultTopic);
- defaultSetting();
- }
+// if(defaultTopic!=null){
+// BrokerLoadBalance brokerLoadBalance = new BrokerLoadBalance();
+// defaultBrokerAddress = brokerLoadBalance.chooseBroker(strategy, defaultTopic);
+// defaultSetting();
+// }
}
@@ -124,16 +125,15 @@ public void shutdown(){
public SendResult send(final Message message){
return sendMessage(message,this.strategy);
}
-
+ private final BrokerLoadBalance brokerLoadBalance = new BrokerLoadBalance(this);
private SendResult sendMessage(Message message,BrokerChooseStrategy strategy) {
idDistributor.setIdByAnnotation(message);
- KryoInetAddress brokerAddress = chooseBroker(strategy,topicRouteData, message.getTopic());
+ KryoInetAddress brokerAddress = brokerLoadBalance.chooseBroker(strategy,message.getTopic(),message);
messageSendClientConfig=new NettyClientConfig(brokerAddress);
messageSendClient.setNettyClientConfig(messageSendClientConfig);
messageSendClient.connectWithoutWaitForClose();
produceHandler.sendMsg(message);
//todo:obtain return result
-
return null;
}
@@ -141,15 +141,15 @@ public SendResult sendWithStrategy(final Message message,BrokerChooseStrategy st
return sendMessage(message,strategy1);
}
- /**
- * send to default topic
- * @param message
- * @return
- */
- public SendResult continuousSend(final Message message){
- produceHandler.sendMsg(message);
- return null;
- }
+// /**
+// * send to default topic
+// * @param message
+// * @return
+// */
+// public SendResult continuousSend(final Message message){
+// produceHandler.sendMsg(message);
+// return null;
+// }
diff --git a/client/src/main/java/me/deve/streamq/client/util/ClientUtil.java b/client/src/main/java/me/deve/streamq/client/util/ClientUtil.java
deleted file mode 100644
index 23f8290..0000000
--- a/client/src/main/java/me/deve/streamq/client/util/ClientUtil.java
+++ /dev/null
@@ -1,50 +0,0 @@
-//-*- coding =utf-8 -*-
-//@Time : 2023/6/29
-//@Author: 邓闽川
-//@File ClientUtil.java
-//@software:IntelliJ IDEA
-package me.deve.streamq.client.util;
-
-import cn.hutool.core.util.RandomUtil;
-import me.deve.streamq.client.msgstrategy.BrokerChooseStrategy;
-import me.deve.streamq.common.address.KryoInetAddress;
-import me.deve.streamq.common.component.Broker;
-
-import java.util.HashMap;
-import java.util.List;
-
-public class ClientUtil {
-
- public static KryoInetAddress chooseBroker(BrokerChooseStrategy strategy, HashMap> topicRouteData, String topicName){
- List brokers = topicRouteData.get(topicName);
- return
- switch (strategy) {
- case ROUND_ROBIN-> chooseBrokerByRoundRobin(brokers);
- case RANDOM -> chooseBrokerByRandom(brokers);
- case HASH -> chooseBrokerByHash(brokers);
- };
- }
-
- private static KryoInetAddress chooseBrokerByHash(List brokers) {
-
- return null;
- }
-
- private static KryoInetAddress chooseBrokerByRandom(List brokers) {
- if(brokers==null){
- return null;
- }
- int index = RandomUtil.randomInt(0, brokers.size());
- return brokers.get(index).getInetAddress();
- }
-
- private static KryoInetAddress chooseBrokerByRoundRobin(List brokers) {
- return null;
-
-
-
-
- }
-
-
-}
diff --git a/client/src/test/java/me/deve/streamq/client/ClientApplicationTests.java b/client/src/test/java/me/deve/streamq/client/ClientApplicationTests.java
index fc6e421..8ece411 100644
--- a/client/src/test/java/me/deve/streamq/client/ClientApplicationTests.java
+++ b/client/src/test/java/me/deve/streamq/client/ClientApplicationTests.java
@@ -1,17 +1,17 @@
package me.deve.streamq.client;
-import cn.hutool.core.lang.hash.Hash;
import cn.hutool.core.net.NetUtil;
+import me.deve.streamq.client.loadbalance.BrokerLoadBalance;
import me.deve.streamq.common.message.Message;
import me.deve.streamq.common.util.IdWorker;
import me.deve.streamq.common.util.serializer.KryoSerializer;
import org.junit.jupiter.api.Test;
-import org.springframework.boot.test.context.SpringBootTest;
import java.io.File;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.util.*;
@@ -53,8 +53,11 @@ void testMac(){
void testIdWorker(){
IdWorker idWorker = new IdWorker();
System.out.println(idWorker.nextId());
+ }
+ @Test
+ void testAddress() throws UnknownHostException {
-
+ System.out.println(BrokerLoadBalance.hash(InetAddress.getLocalHost().getHostAddress()));
}
diff --git a/common/src/main/java/me/deve/streamq/common/message/Message.java b/common/src/main/java/me/deve/streamq/common/message/Message.java
index da988c6..7da5d39 100644
--- a/common/src/main/java/me/deve/streamq/common/message/Message.java
+++ b/common/src/main/java/me/deve/streamq/common/message/Message.java
@@ -54,6 +54,7 @@ public void setId(long id){
/**
* 消息所属tag,可用于筛选消息
*/
+ @Getter
private String []tags;
/**
* 代表消息的关键词
diff --git a/nameserver/README.md b/nameserver/README.md
index e69de29..1303f90 100644
--- a/nameserver/README.md
+++ b/nameserver/README.md
@@ -0,0 +1,14 @@
+# nameserver cluster模式
+
+## 采用协议:
+
+gossip
+
+## 实现:
+
+1. nameserver启动时读取默认路径下的namserverAddress.conf配置,用户可在配置中任意配置3~5个nameserver种子节点地址。
+2. 向这些地址定时发送namserver上线信息(udp协议),直到回复ack
+3. namserver接收彼此的上线信息,如果接收到了nameserver上线信息则向自身配置的几个nameserver节点发送上线信息,接收到broker注册信息也向配置的节点发送broker注册信息,更新broker表
+4. 对broker使用定时任务监测,一段时间未更新则剔除
+5. 如果producer拉取broker地址信息先在本机进行查找,如果没找到再向配置文件中的目标server发起查找请求
+
diff --git a/nameserver/broker.conf b/nameserver/broker.conf
index 28edbd9..4fb72f7 100644
--- a/nameserver/broker.conf
+++ b/nameserver/broker.conf
@@ -1 +1 @@
-{"Broker(id=1692844588048, name=broker index:1692844588048, inetAddress=KryoInetAddress(address=192.168.56.1, port=10011), topicName=test topic)":1692844659929}
\ No newline at end of file
+{"Broker(id=1693030836517, name=broker index:1693030836517, inetAddress=KryoInetAddress(address=192.168.56.1, port=10011), topicName=test topic)":1693030859101}
\ No newline at end of file
diff --git a/nameserver/src/main/java/me/deve/streamq/nameserver/NameserverController.java b/nameserver/src/main/java/me/deve/streamq/nameserver/NameserverController.java
index 17fcbe0..fcdac00 100644
--- a/nameserver/src/main/java/me/deve/streamq/nameserver/NameserverController.java
+++ b/nameserver/src/main/java/me/deve/streamq/nameserver/NameserverController.java
@@ -5,6 +5,7 @@
//@software:IntelliJ IDEA
package me.deve.streamq.nameserver;
+import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.nio.NioEventLoopGroup;
import me.deve.streamq.common.config.NameserverConfig;
@@ -16,6 +17,8 @@
import me.deve.streamq.nameserver.handler.NameServerDealHeartUploadHandler;
import me.deve.streamq.remoting.netty.NettyClient;
import me.deve.streamq.remoting.netty.NettyServer;
+import me.deve.streamq.remoting.thread.NettyClientThread;
+import me.deve.streamq.remoting.thread.NettyServerThread;
import java.net.InetSocketAddress;
import java.util.List;
@@ -33,20 +36,24 @@ public class NameserverController {
public NameserverController(){
}
+
+ private boolean useCluster=false;
public NameserverController(NettyServerConfig nettyServerConfig,NettyClientConfig nettyClientConfig,NameserverConfig nameserverConfig){
this.nettyServerConfig = nettyServerConfig;
this.nameserverConfig=nameserverConfig;
this.nettyClientConfig=nettyClientConfig;
initializeNetworkComponentsForSingle();
}
- public NameserverController(NettyServerConfig nettyServerConfig, NettyClientConfig nettyClientConfig, NameserverConfig nameserverConfig, List targetAddress){
- this.nettyServerConfig = nettyServerConfig;
- this.nameserverConfig=nameserverConfig;
- this.nettyClientConfig=nettyClientConfig;
- initializeNetworkComponentsForCluster();
- }
public void start(){
- nettyServer.start();
+ NettyServerThread nettyServerThread = new NettyServerThread(nettyServer);
+ nettyServerThread.start();
+ if(useCluster){
+ NioEventLoopGroup bossGroup = new NioEventLoopGroup();
+ NioEventLoopGroup workerGroup = new NioEventLoopGroup();
+ Bootstrap bootstrap=new Bootstrap();
+ NettyClientThread nettyClientThread = new NettyClientThread(nettyClient);
+ nettyClientThread.start();
+ }
/**
* shutdown gracefully
*/
@@ -69,16 +76,7 @@ private void initializeNetworkComponentsForSingle(){
nettyServer = new NettyServer(bossGroup, workerGroup, bootstrap, nettyServerConfig, nameServerDealHeartUploadHandler,nameServerDealFindingHandler);
}
- private void initializeNetworkComponentsForCluster(){
- NioEventLoopGroup bossGroup = new NioEventLoopGroup();
- NioEventLoopGroup workerGroup = new NioEventLoopGroup();
- ServerBootstrap bootstrap = new ServerBootstrap();
- NameServerDealHeartUploadHandler nameServerDealHeartUploadHandler = new NameServerDealHeartUploadHandler();
- NameServerDealFindingHandler nameServerDealFindingHandler = new NameServerDealFindingHandler();
- GossipHandler gossipHandler = new GossipHandler();
- //addLast mode to add handler
- nettyServer = new NettyServer(bossGroup, workerGroup, bootstrap, nettyServerConfig, nameServerDealHeartUploadHandler,nameServerDealFindingHandler,gossipHandler);
- }
+
public void shutdown(){
nettyServer.shutdown();
diff --git a/nameserver/src/main/java/me/deve/streamq/nameserver/NameserverStartup.java b/nameserver/src/main/java/me/deve/streamq/nameserver/NameserverStartup.java
index 285332c..cbc8695 100644
--- a/nameserver/src/main/java/me/deve/streamq/nameserver/NameserverStartup.java
+++ b/nameserver/src/main/java/me/deve/streamq/nameserver/NameserverStartup.java
@@ -11,7 +11,9 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -26,41 +28,22 @@ public class NameserverStartup {
private static NameserverConfig nameserverConfig;
- public static List targetAddressList;
+ public static Map targetAddressMap;
public static void main(String[] args) throws IOException {
- //todo:解析args决定使用单机版还是集群部署
-// startupSingle(args);
- startupCluster(args);
-
-
-
+ startupSingle(args);
}
private static void startupCluster(String[] args) throws IOException {
- //读取配置文件
File seedFile = new File("nameserver/seedAddress.conf");
if(!seedFile.exists()){
throw new IOException("please set seed nameserver address");
}
- List targetAddressList = loadSeedIpAndPort(seedFile);
-
-
-
-
+ targetAddressMap = loadSeedIpAndPort(seedFile);
}
- /**
- * 单机启动
- * @param args
- */
private static void startupSingle(String []args) throws IOException {
parseCommandlineAndConfigFile(args);
- File seedFile = new File("nameserver/seedAddress.conf");
- if(!seedFile.exists()){
- throw new IOException("please set seed nameserver address");
- }
- targetAddressList = loadSeedIpAndPort(seedFile);
NameserverController nameserverController = createAndStartNameserverControllerSingleton();
}
@@ -71,11 +54,6 @@ private static NameserverController createAndStartNameserverControllerSingleton(
return nameserverController;
}
- private static NameserverController createAndStartNameserverControllerCluster(List targetAddressList) {
- NameserverController nameserverController = new NameserverController(nettyServerConfig, nettyClientConfig, nameserverConfig,targetAddressList);
- start(nameserverController);
- return nameserverController;
- }
private static void start(NameserverController nameserverController) {
nameserverController.start();
}
@@ -89,26 +67,27 @@ private static void parseCommandlineAndConfigFile(String[] args) {
//todo:parse args and set them to nettyClientConfig nettyServerConfig
}
- public static List loadSeedIpAndPort(File file) {
- List inetSocketAddressArrayList = new ArrayList<>();
+ public static Map loadSeedIpAndPort(File file) {
+ Map inetSocketAddressArrayMap = new HashMap<>();
try {
BufferedReader reader = new BufferedReader(new FileReader(file));
String line = null;
while((line=reader.readLine())!=null){
- Pattern pattern = Pattern.compile("(\\d+\\.\\d+\\.\\d+\\.\\d+):(\\d+)");
+ Pattern pattern = Pattern.compile("^\\w+,(\\d+\\.\\d+\\.\\d+\\.\\d+):(\\d+)$");
Matcher matcher = pattern.matcher(line);
if(matcher.find()){
- String ip=matcher.group(1);
- int port= Integer.parseInt(matcher.group(2));
+ String name=matcher.group(1);
+ String ip=matcher.group(2);
+ int port= Integer.parseInt(matcher.group(3));
InetSocketAddress inetSocketAddress = new InetSocketAddress(ip, port);
- inetSocketAddressArrayList.add(inetSocketAddress);
+ inetSocketAddressArrayMap.put(name,inetSocketAddress);
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
- return inetSocketAddressArrayList;
+ return inetSocketAddressArrayMap;
}
}
diff --git a/nameserver/src/main/java/me/deve/streamq/nameserver/gossip/GossipHandler.java b/nameserver/src/main/java/me/deve/streamq/nameserver/gossip/GossipHandler.java
index 05f53b7..da1ab71 100644
--- a/nameserver/src/main/java/me/deve/streamq/nameserver/gossip/GossipHandler.java
+++ b/nameserver/src/main/java/me/deve/streamq/nameserver/gossip/GossipHandler.java
@@ -18,6 +18,7 @@
import me.deve.streamq.common.message.FunctionMessage;
import me.deve.streamq.common.message.FunctionMessageType;
import me.deve.streamq.common.util.serializer.KryoSerializer;
+import me.deve.streamq.nameserver.NameserverStartup;
import java.net.InetSocketAddress;
import java.util.ArrayList;
@@ -31,6 +32,9 @@ public class GossipHandler extends ChannelInboundHandlerAdapter {
private final KryoSerializer kryoSerializer = new KryoSerializer();
private final ByteBufAllocator allocator= PooledByteBufAllocator.DEFAULT;
+
+
+
/**
* nameserver's name and address
*/
diff --git a/nameserver/src/main/java/me/deve/streamq/nameserver/handler/NameServerDealHeartUploadHandler.java b/nameserver/src/main/java/me/deve/streamq/nameserver/handler/NameServerDealHeartUploadHandler.java
index 6e29eb2..8824637 100644
--- a/nameserver/src/main/java/me/deve/streamq/nameserver/handler/NameServerDealHeartUploadHandler.java
+++ b/nameserver/src/main/java/me/deve/streamq/nameserver/handler/NameServerDealHeartUploadHandler.java
@@ -20,10 +20,9 @@
import me.deve.streamq.nameserver.timertask.DeleteTimerTask;
import me.deve.streamq.nameserver.timertask.PersistTimerTask;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -42,6 +41,7 @@ public class NameServerDealHeartUploadHandler extends ChannelInboundHandlerAdapt
private final PersistTimerTask persistTimerTask=new PersistTimerTask(livingBrokers);
+ private ThreadPoolExecutor infectThreadPool=new ThreadPoolExecutor(5,10,1000,TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(10),new ThreadPoolExecutor.AbortPolicy());
private final ThreadPoolExecutor threadPool=new ThreadPoolExecutor(
5,
10,
@@ -73,8 +73,8 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
use file to store data
*/
livingBrokers.put(message.getBroker(),DateUtil.date());
- //infect to other
-// NameserverStartup.targetAddressList
+
+
byteBuf.release();
}
diff --git a/remoting/src/main/java/me/deve/streamq/remoting/ClientMain.java b/remoting/src/main/java/me/deve/streamq/remoting/ClientMain.java
index 1df8a55..ccbaeec 100644
--- a/remoting/src/main/java/me/deve/streamq/remoting/ClientMain.java
+++ b/remoting/src/main/java/me/deve/streamq/remoting/ClientMain.java
@@ -20,6 +20,7 @@
import me.deve.streamq.remoting.handler.ClientHandler;
import me.deve.streamq.remoting.netty.NettyClient;
+import java.net.ConnectException;
import java.net.InetSocketAddress;
/**
@@ -27,7 +28,15 @@
*/
public class ClientMain {
public static void main(String[] args) throws InterruptedException {
+ try {
NettyClient nettyClient = new NettyClient(new NioEventLoopGroup(), new Bootstrap(), new NettyClientConfig(new KryoInetAddress("127.0.0.1",8810)), new ClientHandler());
nettyClient.start();
+ } catch (Exception e) {
+ if(e instanceof ConnectException){
+ e.printStackTrace();
+ }
+
+
+ }
}
}
diff --git a/remoting/src/main/java/me/deve/streamq/remoting/netty/NettyClient.java b/remoting/src/main/java/me/deve/streamq/remoting/netty/NettyClient.java
index ad8fa2d..8380f9d 100644
--- a/remoting/src/main/java/me/deve/streamq/remoting/netty/NettyClient.java
+++ b/remoting/src/main/java/me/deve/streamq/remoting/netty/NettyClient.java
@@ -13,6 +13,7 @@
import me.deve.streamq.common.address.KryoInetAddress;
import me.deve.streamq.common.config.NettyClientConfig;
+import java.net.ConnectException;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
@@ -110,6 +111,7 @@ public void startMultiple(List kryoInetAddressList){
});
});
}
+
public void shutdown(){
eventExecutors.shutdownGracefully();
}