diff --git a/balancer.go b/balancer.go index f901117..2b1ae20 100644 --- a/balancer.go +++ b/balancer.go @@ -163,21 +163,22 @@ func buildAddressKey(addr resolver.Address) string { return fmt.Sprintf("%s", addr.Addr) } -func (p *polarisNamingBalancer) createSubConnection(addr resolver.Address) { - key := buildAddressKey(addr) +func (p *polarisNamingBalancer) createSubConnection(key string, addr resolver.Address) { p.rwMutex.Lock() defer p.rwMutex.Unlock() if _, ok := p.subConns[key]; ok { return } // is a new address (not existing in b.subConns). - sc, err := p.cc.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{HealthCheckEnabled: true}) + sc, err := p.cc.NewSubConn( + []resolver.Address{addr}, balancer.NewSubConnOptions{HealthCheckEnabled: true}) if err != nil { - GetLogger().Warn("[Polaris][Balancer] failed to create new SubConn: %v", err) + GetLogger().Error("[Polaris][Balancer] failed to create new SubConn: %v", err) return } p.subConns[key] = sc p.scStates[sc] = connectivity.Idle + p.csEvltr.RecordTransition(connectivity.Shutdown, connectivity.Idle) sc.Connect() } @@ -199,16 +200,13 @@ func (p *polarisNamingBalancer) UpdateClientConnState(state balancer.ClientConnS } GetLogger().Debug("[Polaris][Balancer] got new ClientConn state: ", state) if len(state.ResolverState.Addresses) == 0 { + GetLogger().Error("[Polaris][Balancer] receive empty addresses, host=%s", p.host) p.ResolverError(errors.New("produced zero addresses")) return balancer.ErrBadResolverState } if nil == p.consumerAPI { - sdkCtx, err := PolarisContext() - if nil != err { - return err - } - p.consumerAPI = polaris.NewConsumerAPIByContext(sdkCtx) - p.routerAPI = polaris.NewRouterAPIByContext(sdkCtx) + p.consumerAPI = polaris.NewConsumerAPIByContext(p.options.SDKContext) + p.routerAPI = polaris.NewRouterAPIByContext(p.options.SDKContext) } // Successful resolution; clear resolver error and ensure we return nil. p.resolverErr = nil @@ -216,20 +214,23 @@ func (p *polarisNamingBalancer) UpdateClientConnState(state balancer.ClientConnS // it's used for a quick lookup of an address. addressSet := make(map[string]struct{}) for _, a := range state.ResolverState.Addresses { - addressSet[buildAddressKey(a)] = struct{}{} - p.createSubConnection(a) + key := buildAddressKey(a) + addressSet[key] = struct{}{} + p.createSubConnection(key, a) } p.rwMutex.Lock() defer p.rwMutex.Unlock() for a, sc := range p.subConns { // a way removed by resolver. if _, ok := addressSet[a]; !ok { - p.cc.RemoveSubConn(sc) delete(p.subConns, a) + sc.Shutdown() // Keep the state of this sc in b.scStates until sc's state becomes Shutdown. // The entry will be deleted in HandleSubConnStateChange. } } + p.regeneratePicker(p.options) + p.cc.UpdateState(balancer.State{ConnectivityState: p.state, Picker: p.v2Picker}) return nil } @@ -255,37 +256,35 @@ func (p *polarisNamingBalancer) ResolverError(err error) { func (p *polarisNamingBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { s := state.ConnectivityState GetLogger().Info("[Polaris][Balancer] handle SubConn state change: %p, %v", sc, s) - oldS, quit := func() (connectivity.State, bool) { - p.rwMutex.Lock() - defer p.rwMutex.Unlock() - oldS, ok := p.scStates[sc] - if !ok { - GetLogger().Info("[Polaris][Balancer] got state changes for an unknown SubConn: %p, %v", sc, s) - return connectivity.TransientFailure, true - } - if oldS == connectivity.TransientFailure && s == connectivity.Connecting { - // Once a subconn enters TRANSIENT_FAILURE, ignore subsequent - // CONNECTING transitions to prevent the aggregated state from being - // always CONNECTING when many backends exist but are all down. - return oldS, true - } - p.scStates[sc] = s - switch s { - case connectivity.Idle: + p.rwMutex.Lock() + defer p.rwMutex.Unlock() + oldS, ok := p.scStates[sc] + if !ok { + GetLogger().Info("[Polaris][Balancer] got state changes for an unknown SubConn: %p, %v", sc, s) + return + } + if oldS == connectivity.TransientFailure && + (s == connectivity.Connecting || s == connectivity.Idle) { + // Once a subconn enters TRANSIENT_FAILURE, ignore subsequent IDLE or + // CONNECTING transitions to prevent the aggregated state from being + // always CONNECTING when many backends exist but are all down. + if s == connectivity.Idle { sc.Connect() - case connectivity.Shutdown: - // When an address was removed by resolver, b called RemoveSubConn but - // kept the sc's state in scStates. Remove state for this sc here. - delete(p.scStates, sc) - case connectivity.TransientFailure: - // Save error to be reported via picker. - p.connErr = state.ConnectionError } - return oldS, false - }() - if quit { return } + p.scStates[sc] = s + switch s { + case connectivity.Idle: + sc.Connect() + case connectivity.Shutdown: + // When an address was removed by resolver, b called RemoveSubConn but + // kept the sc's state in scStates. Remove state for this sc here. + delete(p.scStates, sc) + case connectivity.TransientFailure: + // Save error to be reported via picker. + p.connErr = state.ConnectionError + } p.state = p.csEvltr.RecordTransition(oldS, s) // Regenerate picker when one of the following happens: @@ -336,7 +335,7 @@ func (p *polarisNamingBalancer) regeneratePicker(options *dialOptions) { readySCs: readySCs, options: options, lbCfg: p.lbCfg, - response: ©R, + insList: ©R, } p.v2Picker = picker } @@ -360,7 +359,7 @@ type polarisNamingPicker struct { readySCs map[string]balancer.SubConn options *dialOptions lbCfg *LBConfig - response *model.InstancesResponse + insList *model.InstancesResponse } func buildSourceInfo(options *dialOptions) *model.ServiceInfo { @@ -407,7 +406,7 @@ func (pnp *polarisNamingPicker) Pick(info balancer.PickInfo) (balancer.PickResul if pnp.options.Route { request := &polaris.ProcessRoutersRequest{} - request.DstInstances = pnp.response + request.DstInstances = pnp.insList if sourceService != nil { // 如果在Conf中配置了SourceService,则优先使用配置 request.SourceService = *sourceService @@ -424,7 +423,7 @@ func (pnp *polarisNamingPicker) Pick(info balancer.PickInfo) (balancer.PickResul return balancer.PickResult{}, err } } else { - resp = pnp.response + resp = pnp.insList } lbReq := pnp.buildLoadBalanceRequest(info, resp) @@ -523,7 +522,6 @@ func (pnp *polarisNamingPicker) addTrafficLabels(info balancer.PickInfo, insReq } } } - return nil } diff --git a/examples/quickstart/consumer/main.go b/examples/quickstart/consumer/main.go index 5619bd1..3f430c6 100644 --- a/examples/quickstart/consumer/main.go +++ b/examples/quickstart/consumer/main.go @@ -29,6 +29,7 @@ import ( polaris "github.com/polarismesh/grpc-go-polaris" "github.com/polarismesh/grpc-go-polaris/examples/common/pb" + "github.com/polarismesh/polaris-go/api" ) const ( @@ -70,16 +71,16 @@ func main() { ctx = metadata.AppendToOutgoingContext(ctx, "uid", r.Header.Get("uid")) // 请求时设置本次请求的负载均衡算法 - // ctx = polaris.RequestScopeLbPolicy(ctx, api.LBPolicyRingHash) - // ctx = polaris.RequestScopeLbHashKey(ctx, r.Header.Get("uid")) + ctx = polaris.RequestScopeLbPolicy(ctx, api.LBPolicyRingHash) + ctx = polaris.RequestScopeLbHashKey(ctx, r.Header.Get("uid")) resp, err := echoClient.Echo(ctx, &pb.EchoRequest{Value: value}) log.Printf("send message, resp (%v), err(%v)", resp, err) if nil != err { - w.WriteHeader(500) + w.WriteHeader(http.StatusInternalServerError) _, _ = w.Write([]byte(err.Error())) return } - w.WriteHeader(200) + w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte(resp.GetValue())) } http.HandleFunc("/echo", indexHandler) diff --git a/resolver.go b/resolver.go index e6cf926..c6fa8cc 100644 --- a/resolver.go +++ b/resolver.go @@ -25,7 +25,6 @@ import ( "fmt" "strconv" "strings" - "sync" "time" "github.com/polarismesh/polaris-go/api" @@ -78,8 +77,8 @@ func targetToOptions(target resolver.Target) (*dialOptions, error) { if len(optionsStr) > 0 { value, err := base64.URLEncoding.DecodeString(optionsStr) if nil != err { - return nil, fmt.Errorf( - "fail to decode endpoint %s, options %s: %w", target.URL.Opaque, optionsStr, err) + GetLogger().Error("[Polaris][Resolver] fail to decode endpoint %s, options %s: %w", target.URL.Opaque, optionsStr, err) + return nil, fmt.Errorf("fail to decode endpoint %s, options %s: %w", target.URL.Opaque, optionsStr, err) } if err = json.Unmarshal(value, options); nil != err { return nil, fmt.Errorf("fail to unmarshal options %s: %w", string(value), err) @@ -104,18 +103,26 @@ func (rb *resolverBuilder) Build( if err != nil { return nil, err } + + sdkCtx, err := PolarisContext() + if nil != err { + return nil, err + } + + options.SDKContext = sdkCtx + ctx, cancel := context.WithCancel(context.Background()) d := &polarisNamingResolver{ - ctx: ctx, - cancel: cancel, - cc: cc, - rn: make(chan struct{}, 1), - target: target, - options: options, - host: host, - port: port, + ctx: ctx, + cancel: cancel, + cc: cc, + target: target, + options: options, + host: host, + port: port, + consumer: api.NewConsumerAPIByContext(sdkCtx), + eventCh: make(chan struct{}, 1), } - d.wg.Add(1) go d.watcher() d.ResolveNow(resolver.ResolveNowOptions{}) return d, nil @@ -141,18 +148,18 @@ type polarisNamingResolver struct { cancel context.CancelFunc cc resolver.ClientConn // rn channel is used by ResolveNow() to force an immediate resolution of the target. - rn chan struct{} - wg sync.WaitGroup - options *dialOptions - target resolver.Target - host string - port int + eventCh chan struct{} + options *dialOptions + target resolver.Target + host string + port int + consumer api.ConsumerAPI } // ResolveNow The method is called by the gRPC framework to resolve the target name func (pr *polarisNamingResolver) ResolveNow(opt resolver.ResolveNowOptions) { // 立即resolve,重新查询服务信息 select { - case pr.rn <- struct{}{}: + case pr.eventCh <- struct{}{}: default: } } @@ -169,16 +176,11 @@ const keyDialOptions = "options" const keyResponse = "response" -func (pr *polarisNamingResolver) lookup() (*resolver.State, api.ConsumerAPI, error) { - sdkCtx, err := PolarisContext() - if nil != err { - return nil, nil, err - } - consumerAPI := api.NewConsumerAPIByContext(sdkCtx) +func (pr *polarisNamingResolver) lookup() (*resolver.State, error) { instancesRequest := &api.GetInstancesRequest{ GetInstancesRequest: model.GetInstancesRequest{ - Service: pr.host, Namespace: getNamespace(pr.options), + Service: pr.host, SkipRouteFilter: true, }, } @@ -186,32 +188,10 @@ func (pr *polarisNamingResolver) lookup() (*resolver.State, api.ConsumerAPI, err instancesRequest.Metadata = pr.options.DstMetadata } - resp, err := consumerAPI.GetInstances(instancesRequest) + resp, err := pr.consumer.GetInstances(instancesRequest) if nil != err { - return nil, consumerAPI, err - } - - updated := false - for _, instance := range resp.Instances { - if !instance.IsHealthy() || instance.IsIsolated() { // 过滤掉不健康和隔离的。 - updated = true - break - } - } - if updated { // 少数情况,避免创建 slice - usedInstances := make([]model.Instance, 0, len(resp.Instances)) - totalWeight := 0 - for _, instance := range resp.Instances { - if !instance.IsHealthy() || instance.IsIsolated() { - continue - } - usedInstances = append(usedInstances, instance) - totalWeight += instance.GetWeight() - } - resp.Instances = usedInstances - resp.TotalWeight = totalWeight + return nil, err } - rc := &ResolverContext{ Target: pr.target, Host: pr.host, @@ -234,61 +214,73 @@ func (pr *polarisNamingResolver) lookup() (*resolver.State, api.ConsumerAPI, err Addr: fmt.Sprintf("%s:%d", instance.GetHost(), instance.GetPort()), }) } - return state, consumerAPI, nil + return state, nil } -func (pr *polarisNamingResolver) doWatch( - consumerAPI api.ConsumerAPI) (model.ServiceKey, <-chan model.SubScribeEvent, error) { - watchRequest := &api.WatchServiceRequest{} - watchRequest.Key = model.ServiceKey{ - Namespace: getNamespace(pr.options), - Service: pr.host, - } - resp, err := consumerAPI.WatchService(watchRequest) - if nil != err { - return watchRequest.Key, nil, err +func (pr *polarisNamingResolver) doWatch() (*model.WatchAllInstancesResponse, error) { + watchRequest := &api.WatchAllInstancesRequest{ + WatchAllInstancesRequest: model.WatchAllInstancesRequest{ + InstancesListener: pr, + WatchMode: model.WatchModeNotify, + ServiceKey: model.ServiceKey{ + Namespace: getNamespace(pr.options), + Service: pr.host, + }, + }, } - return watchRequest.Key, resp.EventChannel, nil + return pr.consumer.WatchAllInstances(watchRequest) } func (pr *polarisNamingResolver) watcher() { - var ( - consumerAPI api.ConsumerAPI - eventChan <-chan model.SubScribeEvent - ) + watchRsp, err := pr.doWatch() + if nil != err { + GetLogger().Error("[Polaris][Resolver] fail to do watch for namespace=%s service=%s: %v", + pr.options.Namespace, pr.host, err) + } + ticker := time.NewTicker(5 * time.Second) defer func() { ticker.Stop() - pr.wg.Done() + watchRsp.CancelWatch() + close(pr.eventCh) }() for { select { case <-pr.ctx.Done(): + GetLogger().Info("[Polaris][Resolver] exist watch instance change event for namespace=%s service=%s: %v", + pr.options.Namespace, pr.host) return - case <-pr.rn: - case <-eventChan: + case <-pr.eventCh: case <-ticker.C: } - var ( - state *resolver.State - err error - ) - state, consumerAPI, err = pr.lookup() - if err != nil { - pr.cc.ReportError(err) - continue - } - if err = pr.cc.UpdateState(*state); nil != err { - GetLogger().Error("fail to do update service %s: %v", pr.target.URL.Host, err) - } - var svcKey model.ServiceKey - svcKey, eventChan, err = pr.doWatch(consumerAPI) - if nil != err { - GetLogger().Error("fail to do watch for service %s: %v", svcKey, err) - } + pr.doRefresh() } } +func (pr *polarisNamingResolver) doRefresh() { + var ( + state *resolver.State + err error + ) + state, err = pr.lookup() + if err != nil { + GetLogger().Error("[Polaris][Resolver] fail to do lookup %s: %v", pr.target.URL.Host, err) + pr.cc.ReportError(err) + return + } + if err = pr.cc.UpdateState(*state); nil != err { + GetLogger().Error("[Polaris][Resolver] fail to do update state %s: %v", pr.target.URL.Host, err) + } +} + +func (pr *polarisNamingResolver) OnInstancesUpdate(rsp *model.InstancesResponse) { + defer func() { + // cover 住 chan 的写入一个 closed 导致的 panic + _ = recover() + }() + pr.eventCh <- struct{}{} +} + // Close resolver closed func (pr *polarisNamingResolver) Close() { pr.cancel()