respect: rpc框架: https://doc.rpcx.io/
服务发现,注册中心,服务治理,限流熔断隔离降级,codec等
-
一般的,一个rpc框架就是一个微服务框架
-
一个好的协议,request和response应该是同样的格式
-
插件化与回调
-
服务发现
- 点对点
- 注册中心
-
服务选择
- 重试策略
- 节点选择策略
-
限流熔断,隔离降级
-
编解码codec
- 不同的序列化手段
-
服务监控
- trace:调用链追踪
- logging:日志
- metric:指标,统计分析
-
服务发现
-
类似DNS,是一个kv数据库,完成servicName到ip:port的映射
-
点对点
- 直接指定对端ip:port,dial对端,不需要服务发现
-
点对多
- 同点对点,但指定了多个对端ip:port,它们将提供同样的服务,客户端在此模式下可以有不同的重试策略。
-
注册中心: zookeeper , etcd , consul
-
服务注册中心用来实现服务发现和服务的元数据存储(比如serviceName到多个ip:port的映射)。
-
传统的服务发现可能直接由静态配置文件设置,并且可以运行时动态监听文件修改并重新读入并应用。
-
更现代的方式是拥有一个注册中心,我们不再维护本地的配置文件,好处是注册中心是中心化管理,多个客户端共享。
-
注册中心都实现了某种分布式共识算法(指注册中心本身是分布式的(比如一个部署好的zookeeper集群),保证其某个节点失效仍可正常运行),其本质就是一个分布式键值数据库,如etcd
-
此模式下,使用rpc时,不再需要指定服务主机地址,而替换为注册中心集群地址
-
一般的,客户端将会向注册中心订阅,这样服务的动态变化将会异步通知到客户端。而不是客户端每次请求都去访问注册中心
-
-
服务选择:
- 失败模式(重试模式):当遇到超时或网络错误,该怎么办?
- 直接失败
- 重试其他节点
- 重试当前节点
- 广播一定数量的目标节点,有一个成功就算成功
- 节点选择
- 随机
- roundrobin(顺序调用)
- weightedRoundRobin(在一个周期内,权值高的调用次数多,且较均匀的分布在周期内)
- 本质也是生成一个调用队列,依次出队
- 网络质量优先(基于ICMP ping)
- 也要防止网络状态不好的服务主机一直饥饿
- 一致性哈希
- 指满足均衡性,单调性,分散性,低负载的哈希算法,该算法将hash值空间组织成虚拟的环
- 首先将服务主机的ip:port计算出哈希值,store进哈希表
- 然后客户端对serviceName:serviceMethod:args计算出哈希值,将该值在环上按一定方向移动,第一个遇到的主机就是选中的主机
- 地理位置优先(计算经纬度)
- 自定义
-
限流:rateLimit
-
目的:有损服务,而不是不服务
-
限流对象
- TCP连接请求
- 一般无法限制tcp的建立,除非中间加一层代理网关
- QPS:连接建立后,是否被处理
- TCP连接请求
-
限流处理
- 返回错误码,比如http常见的500 internal error
- 服务端阻塞等待一段时间,看能否在超时时间内被处理
-
常见算法:
-
固定窗口计数器
- 比如每分钟为一个窗口(一般以整分钟开始1分钟到2分钟一个窗口),限制每个窗口内最多1000个连接
- 缺点:对于随机选取的时间长度为1分钟的区间(比如1.5分钟到2.5分钟),不一定满足连接数小于1000
-
滑动窗口计数器
- 固定窗口相当于长度为1的滑动窗口
- 比如以每秒钟为一个窗口,设置滑动窗口的长度为60,要求每分钟最多1000个连接。
- 每过1秒钟,滑动窗口向前移动一个小窗口,每个小窗口将维护一个计数,记录这个小窗口的时间期间到来的连接数
- 新的连接能否在新的小的时间窗口内被接收,取决于的逻辑的长度为60的滑动窗口内的所有小窗口记录的连接数之和是否大于1000
-
令牌桶 token bucket
-
维护一个有大小的令牌桶,若桶未满,则以一定的速率生成令牌放入桶中
-
每个请求必须在申请到令牌后,才会被处理,否则限流
-
原生令牌桶是基于字节数判断一个packet是否有效,即限制的是读写的byte数
-
一个限流器实现: https://github.com/juju/ratelimit, 其reader/writer实现:
-
func (r *reader) Read(buf []byte) (int, error) { n, err := r.r.Read(buf) if n <= 0 { return n, err } r.bucket.Wait(int64(n)) return n, err } func (w *writer) Write(buf []byte) (int, error) { w.bucket.Wait(int64(len(buf))) return w.w.Write(buf) }
-
-
实际上也可以用于直接限制连接:
-
// rpcx的限流插件:实现了PostConnAcceptPlugin接口 // PostConnAcceptPlugin interface { // HandleConnAccept(net.Conn) (net.Conn, bool) // } func (plugin *RateLimitingPlugin) HandleConnAccept(conn net.Conn) (net.Conn, bool) { return conn, plugin.bucket.TakeAvailable(1) > 0 }
-
-
-
漏桶
- 维持一个固定大小的连接队列,以恒定的速率出队
-
-
-
熔断: circuit breaker(断路器)
- 熔断属于服务作为客户端时的行为
- 当对一个节点的调用出现连续的错误时,断路器将打开,后续对该节点的调用将直接返回错误。一定时间后断路器半开,允许一定数量的请求,若正常访问则全开,否则继续断开
- 这主要是为了防止大量的请求处于请求发出而未超时的等待阶段,若这个客户端本身作为服务,则也会影响自身的服务提供,导致雪崩
- 因为资源是有限的,一个goroutine要2k的栈,再加上1k的recv buffer等等
-
降级
- 服务降级:本质就是提供有损服务
- 限流和熔断都属于服务降级
-
隔离
- 将本机的各个服务隔离开,这也是docker这类容器的优点:隔离
- 实现上,就是对资源的获取是有限度的,比如设置最大的goroutine数,这可以通过线程池做到
常见的编解码器,即对对象的序列化和反序列化功能
- binary
- json
- 对性能要求不高的场景,可读性高
- protobuf
- google出品
- messagePack
rpcx提供了多种回调接口,只要插件实现了这些接口,再注册进插件中心即可在合适的地方被调用
比如限流插件,我们期望其在连接建立后被调用,因此要实现HandleConnAccept(conn net.Conn) (net.Conn, bool)
方法
type (
// ... 省略一部分
// PostConnAcceptPlugin represents connection accept plugin.
// if returns false, it means subsequent IPostConnAcceptPlugins should not continue to handle this conn
// and this conn has been closed.
PostConnAcceptPlugin interface {
HandleConnAccept(net.Conn) (net.Conn, bool)
}
// PostConnClosePlugin represents client connection close plugin.
PostConnClosePlugin interface {
HandleConnClose(net.Conn) bool
}
// PreReadRequestPlugin represents .
PreReadRequestPlugin interface {
PreReadRequest(ctx context.Context) error
}
// PostReadRequestPlugin represents .
PostReadRequestPlugin interface {
PostReadRequest(ctx context.Context, r *protocol.Message, e error) error
}
// ...省略一部分
)
插件中心将会在合适的地方调用注册好的插件,比如read前后的回调:
func (s *Server) readRequest(ctx context.Context, r io.Reader) (req *protocol.Message, err error) {
// here callback
err = s.Plugins.DoPreReadRequest(ctx)
if err != nil {
return nil, err
}
// pool req?
req = protocol.GetPooledMsg()
err = req.Decode(r)
if err == io.EOF {
return req, err
}
// here callback
perr := s.Plugins.DoPostReadRequest(ctx, req, err)
if err == nil {
err = perr
}
return req, err
}
一个朴素的插件中心的实现,将会把不同的插件无差别的放进一个[]interface{},调用时再遍历一个个type assertion,看是否是想要的接口,这也是rpcx默认的插件中心的实现方法
//DoPostConnAccept handles accepted conn
func (p *pluginContainer) DoPostConnAccept(conn net.Conn) (net.Conn, bool) {
var flag bool
for i := range p.plugins {
if plugin, ok := p.plugins[i].(PostConnAcceptPlugin); ok {
conn, flag = plugin.HandleConnAccept(conn)
if !flag { //interrupt
conn.Close()
return conn, false
}
}
}
return conn, true
}
service mesh
- 分为数据面和控制面,用户只需编写数据面即可