Skip to content

Commit

Permalink
负载均衡策略,轮询 hash 一致性hash
Browse files Browse the repository at this point in the history
  • Loading branch information
dengminchuan committed Aug 26, 2023
1 parent 43c2268 commit 4339a14
Show file tree
Hide file tree
Showing 21 changed files with 409 additions and 136 deletions.
68 changes: 68 additions & 0 deletions .idea/inspectionProfiles/Project_Default.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions .idea/leetcode/editor.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions .idea/leetcode/statistics.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions .idea/nowcoder/editor.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,6 @@ else if(messageType ==FunctionMessageType.PULL_MESSAGE){

}

private FunctionMessage getFunctionMessage(byte[] array) {
KryoSerializer kryoSerializer = new KryoSerializer();
return kryoSerializer.deserialize(array, FunctionMessage.class);
}

public byte[] getMessageBytes(Object msg){
ByteBuf byteBuf= (ByteBuf) msg;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
//-*- coding =utf-8 -*-
//@Time : 2023/6/29
//@Author: 邓闽川
//@File ClientUtil.java
//@software:IntelliJ IDEA
package me.deve.streamq.client.loadbalance;

import cn.hutool.core.util.RandomUtil;
import lombok.extern.slf4j.Slf4j;
import me.deve.streamq.client.msgstrategy.BrokerChooseStrategy;
import me.deve.streamq.client.producer.DefaultMQProducer;
import me.deve.streamq.common.address.KryoInetAddress;
import me.deve.streamq.common.component.Broker;
import me.deve.streamq.common.message.Message;


import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;


@Slf4j
public class BrokerLoadBalance {

private DefaultMQProducer producer;
/**
* RoundRobin offset
*/
private int offset=0;

private ConsistentHashingWithVirtualNode consistentHashingWithVirtualNode;


public BrokerLoadBalance(){

}
public BrokerLoadBalance(DefaultMQProducer producer){
this.producer=producer;
}
public KryoInetAddress chooseBroker(BrokerChooseStrategy strategy, String topicName, Message message){
List<Broker> brokers = producer.topicRouteData.get(topicName);
return
switch (strategy) {
case ROUND_ROBIN-> chooseBrokerByRoundRobin(brokers);
case RANDOM -> chooseBrokerByRandom(brokers);
case HASH -> chooseBrokerByHash(brokers);
case CONSISTENCY_HASH -> chooseBrokerByConsistencyHash(brokers,message);
};
}

/**
* 一致性hash算法,即使broker数量发生变化前面的映射也不会受到影响
* 1.节点数量发生变化时尽量保证相同线程相同tags消息发送到同一节点,单tag 单线程发送没必要使用此负载均衡方式
* 2.只有少量生产者会被重定向到不同broker节点,broker端压力不会突然改变
* 3.使负载均衡更加平滑
* 4.在分布式缓存中使用效果最好,扩容缩容时防止过多数据迁移和缓存雪崩
* 缺点:需要的消耗较大
* @param brokers
* @return
*/
private KryoInetAddress chooseBrokerByConsistencyHash(List<Broker> brokers,Message message) {
Broker broker = consistentHashingWithVirtualNode.getNode(message, brokers);
return broker.getInetAddress();
}

private KryoInetAddress chooseBrokerByHash(List<Broker> brokers) {
try {
String hostAddress = InetAddress.getLocalHost().getHostAddress();
int hashCode = hash(hostAddress);
int size = brokers.size();
return brokers.get(hashCode%size).getInetAddress();
} catch (UnknownHostException e) {
log.error("获取本机ip错误");
}

return null;
}

private KryoInetAddress chooseBrokerByRandom(List<Broker> brokers) {
if(brokers==null){
return null;
}
int index = RandomUtil.randomInt(0, brokers.size());
return brokers.get(index).getInetAddress();
}

/**
* 轮询
* @param brokers
* @return
*/
private KryoInetAddress chooseBrokerByRoundRobin(List<Broker> brokers) {
offset%=brokers.size();
return brokers.get(offset++).getInetAddress();

}
public static int hash(Object obj) {
return obj.hashCode()>>16^obj.hashCode();
}



}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
//-*- coding =utf-8 -*-
//@Time : 2023/8/26
//@Author: 邓闽川
//@File ConsistentHashingWithVirtualNode.java
//@software:IntelliJ IDEA
package me.deve.streamq.client.loadbalance;

import me.deve.streamq.common.component.Broker;
import me.deve.streamq.common.message.Message;

import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;

public class ConsistentHashingWithVirtualNode {
private List<Broker> brokers;

private final SortedMap<Integer,VirtualBroker> hashRing=new TreeMap<>();

private final int VIRTUAL_NODE_NUMBER=10;




public ConsistentHashingWithVirtualNode(List<Broker> brokers){
this.brokers = brokers;
}

public void buildHashRing(List<Broker> brokers){
for (Broker broker : brokers) {
for(int i=0;i<VIRTUAL_NODE_NUMBER;i++){
VirtualBroker virtualBroker = new VirtualBroker(broker, i);
int hash = BrokerLoadBalance.hash(virtualBroker);
hashRing.put(hash, virtualBroker);
}
}
}

/**\
* 获取应该发送到的节点
* @return
*/
public Broker getNode(Message message,List<Broker> brokers){
//hashRing发生了变化就重建
if(this.brokers==null||!this.brokers.equals(brokers)){
this.brokers=brokers;
buildHashRing(brokers);
}
//计算key
int key1 = BrokerLoadBalance.hash(message.getTags());
int key2=BrokerLoadBalance.hash(Thread.currentThread());
int key=key1^key2;
SortedMap<Integer, VirtualBroker> subMap = hashRing.tailMap(key);
//说明是第一个
if(subMap.isEmpty()){
VirtualBroker virtualBroker = hashRing.get(hashRing.firstKey());
return virtualBroker.getBroker();
}else{
VirtualBroker virtualBroker = hashRing.get(subMap.firstKey());
return virtualBroker.getBroker();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
//-*- coding =utf-8 -*-
//@Time : 2023/8/26
//@Author: 邓闽川
//@File VitualBroker.java
//@software:IntelliJ IDEA
package me.deve.streamq.client.loadbalance;

import lombok.Getter;
import me.deve.streamq.common.component.Broker;

public class VirtualBroker {
@Getter
private Broker broker;

private Integer virtual_id;

public VirtualBroker(Broker broker, int i) {
this.broker=broker;
this.virtual_id=i;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
public enum BrokerChooseStrategy {
ROUND_ROBIN,
RANDOM,
HASH
HASH,
CONSISTENCY_HASH

}
Loading

0 comments on commit 4339a14

Please sign in to comment.