Skip to content

Commit

Permalink
refactor:调整 grpc 部分逻辑实现&补充日志打印 (#68)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuntaojun authored Jul 16, 2024
1 parent 4bce62a commit 0a9f3d0
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 135 deletions.
88 changes: 43 additions & 45 deletions balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,21 +163,22 @@ func buildAddressKey(addr resolver.Address) string {
return fmt.Sprintf("%s", addr.Addr)
}

func (p *polarisNamingBalancer) createSubConnection(addr resolver.Address) {
key := buildAddressKey(addr)
func (p *polarisNamingBalancer) createSubConnection(key string, addr resolver.Address) {
p.rwMutex.Lock()
defer p.rwMutex.Unlock()
if _, ok := p.subConns[key]; ok {
return
}
// is a new address (not existing in b.subConns).
sc, err := p.cc.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{HealthCheckEnabled: true})
sc, err := p.cc.NewSubConn(
[]resolver.Address{addr}, balancer.NewSubConnOptions{HealthCheckEnabled: true})
if err != nil {
GetLogger().Warn("[Polaris][Balancer] failed to create new SubConn: %v", err)
GetLogger().Error("[Polaris][Balancer] failed to create new SubConn: %v", err)
return
}
p.subConns[key] = sc
p.scStates[sc] = connectivity.Idle
p.csEvltr.RecordTransition(connectivity.Shutdown, connectivity.Idle)
sc.Connect()
}

Expand All @@ -199,37 +200,37 @@ func (p *polarisNamingBalancer) UpdateClientConnState(state balancer.ClientConnS
}
GetLogger().Debug("[Polaris][Balancer] got new ClientConn state: ", state)
if len(state.ResolverState.Addresses) == 0 {
GetLogger().Error("[Polaris][Balancer] receive empty addresses, host=%s", p.host)
p.ResolverError(errors.New("produced zero addresses"))
return balancer.ErrBadResolverState
}
if nil == p.consumerAPI {
sdkCtx, err := PolarisContext()
if nil != err {
return err
}
p.consumerAPI = polaris.NewConsumerAPIByContext(sdkCtx)
p.routerAPI = polaris.NewRouterAPIByContext(sdkCtx)
p.consumerAPI = polaris.NewConsumerAPIByContext(p.options.SDKContext)
p.routerAPI = polaris.NewRouterAPIByContext(p.options.SDKContext)
}
// Successful resolution; clear resolver error and ensure we return nil.
p.resolverErr = nil
// addressSet is the set converted from address;
// it's used for a quick lookup of an address.
addressSet := make(map[string]struct{})
for _, a := range state.ResolverState.Addresses {
addressSet[buildAddressKey(a)] = struct{}{}
p.createSubConnection(a)
key := buildAddressKey(a)
addressSet[key] = struct{}{}
p.createSubConnection(key, a)
}
p.rwMutex.Lock()
defer p.rwMutex.Unlock()
for a, sc := range p.subConns {
// a way removed by resolver.
if _, ok := addressSet[a]; !ok {
p.cc.RemoveSubConn(sc)
delete(p.subConns, a)
sc.Shutdown()
// Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
// The entry will be deleted in HandleSubConnStateChange.
}
}
p.regeneratePicker(p.options)
p.cc.UpdateState(balancer.State{ConnectivityState: p.state, Picker: p.v2Picker})
return nil
}

Expand All @@ -255,37 +256,35 @@ func (p *polarisNamingBalancer) ResolverError(err error) {
func (p *polarisNamingBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
s := state.ConnectivityState
GetLogger().Info("[Polaris][Balancer] handle SubConn state change: %p, %v", sc, s)
oldS, quit := func() (connectivity.State, bool) {
p.rwMutex.Lock()
defer p.rwMutex.Unlock()
oldS, ok := p.scStates[sc]
if !ok {
GetLogger().Info("[Polaris][Balancer] got state changes for an unknown SubConn: %p, %v", sc, s)
return connectivity.TransientFailure, true
}
if oldS == connectivity.TransientFailure && s == connectivity.Connecting {
// Once a subconn enters TRANSIENT_FAILURE, ignore subsequent
// CONNECTING transitions to prevent the aggregated state from being
// always CONNECTING when many backends exist but are all down.
return oldS, true
}
p.scStates[sc] = s
switch s {
case connectivity.Idle:
p.rwMutex.Lock()
defer p.rwMutex.Unlock()
oldS, ok := p.scStates[sc]
if !ok {
GetLogger().Info("[Polaris][Balancer] got state changes for an unknown SubConn: %p, %v", sc, s)
return
}
if oldS == connectivity.TransientFailure &&
(s == connectivity.Connecting || s == connectivity.Idle) {
// Once a subconn enters TRANSIENT_FAILURE, ignore subsequent IDLE or
// CONNECTING transitions to prevent the aggregated state from being
// always CONNECTING when many backends exist but are all down.
if s == connectivity.Idle {
sc.Connect()
case connectivity.Shutdown:
// When an address was removed by resolver, b called RemoveSubConn but
// kept the sc's state in scStates. Remove state for this sc here.
delete(p.scStates, sc)
case connectivity.TransientFailure:
// Save error to be reported via picker.
p.connErr = state.ConnectionError
}
return oldS, false
}()
if quit {
return
}
p.scStates[sc] = s
switch s {
case connectivity.Idle:
sc.Connect()
case connectivity.Shutdown:
// When an address was removed by resolver, b called RemoveSubConn but
// kept the sc's state in scStates. Remove state for this sc here.
delete(p.scStates, sc)
case connectivity.TransientFailure:
// Save error to be reported via picker.
p.connErr = state.ConnectionError
}
p.state = p.csEvltr.RecordTransition(oldS, s)

// Regenerate picker when one of the following happens:
Expand Down Expand Up @@ -336,7 +335,7 @@ func (p *polarisNamingBalancer) regeneratePicker(options *dialOptions) {
readySCs: readySCs,
options: options,
lbCfg: p.lbCfg,
response: &copyR,
insList: &copyR,
}
p.v2Picker = picker
}
Expand All @@ -360,7 +359,7 @@ type polarisNamingPicker struct {
readySCs map[string]balancer.SubConn
options *dialOptions
lbCfg *LBConfig
response *model.InstancesResponse
insList *model.InstancesResponse
}

func buildSourceInfo(options *dialOptions) *model.ServiceInfo {
Expand Down Expand Up @@ -407,7 +406,7 @@ func (pnp *polarisNamingPicker) Pick(info balancer.PickInfo) (balancer.PickResul

if pnp.options.Route {
request := &polaris.ProcessRoutersRequest{}
request.DstInstances = pnp.response
request.DstInstances = pnp.insList
if sourceService != nil {
// 如果在Conf中配置了SourceService,则优先使用配置
request.SourceService = *sourceService
Expand All @@ -424,7 +423,7 @@ func (pnp *polarisNamingPicker) Pick(info balancer.PickInfo) (balancer.PickResul
return balancer.PickResult{}, err
}
} else {
resp = pnp.response
resp = pnp.insList
}

lbReq := pnp.buildLoadBalanceRequest(info, resp)
Expand Down Expand Up @@ -523,7 +522,6 @@ func (pnp *polarisNamingPicker) addTrafficLabels(info balancer.PickInfo, insReq
}
}
}

return nil
}

Expand Down
9 changes: 5 additions & 4 deletions examples/quickstart/consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

polaris "github.com/polarismesh/grpc-go-polaris"
"github.com/polarismesh/grpc-go-polaris/examples/common/pb"
"github.com/polarismesh/polaris-go/api"
)

const (
Expand Down Expand Up @@ -70,16 +71,16 @@ func main() {
ctx = metadata.AppendToOutgoingContext(ctx, "uid", r.Header.Get("uid"))

// 请求时设置本次请求的负载均衡算法
// ctx = polaris.RequestScopeLbPolicy(ctx, api.LBPolicyRingHash)
// ctx = polaris.RequestScopeLbHashKey(ctx, r.Header.Get("uid"))
ctx = polaris.RequestScopeLbPolicy(ctx, api.LBPolicyRingHash)
ctx = polaris.RequestScopeLbHashKey(ctx, r.Header.Get("uid"))
resp, err := echoClient.Echo(ctx, &pb.EchoRequest{Value: value})
log.Printf("send message, resp (%v), err(%v)", resp, err)
if nil != err {
w.WriteHeader(500)
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
w.WriteHeader(200)
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(resp.GetValue()))
}
http.HandleFunc("/echo", indexHandler)
Expand Down
Loading

0 comments on commit 0a9f3d0

Please sign in to comment.