Skip to content

Commit

Permalink
fix:修复客户端鉴权接口设置 (#203)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuntaojun authored Mar 27, 2024
1 parent f88e184 commit 60ecfda
Show file tree
Hide file tree
Showing 9 changed files with 83 additions and 38 deletions.
4 changes: 4 additions & 0 deletions pkg/config/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,10 @@ type ServerConnectorConfig interface {
GetConnectionIdleTimeout() time.Duration
// SetConnectionIdleTimeout 设置连接会被释放的空闲的时长
SetConnectionIdleTimeout(time.Duration)
// GetToken .
GetToken() string
// SetToken .
SetToken(string)
}

// LocalCacheConfig 本地缓存相关配置项.
Expand Down
12 changes: 12 additions & 0 deletions pkg/config/config_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ type ConfigConnectorConfigImpl struct {

Plugin PluginConfigs `yaml:"plugin" json:"plugin"`

Token string `yaml:"token" json:"token"`

ConnectorType string `yaml:"connectorType" json:"connectorType"`
}

Expand Down Expand Up @@ -158,6 +160,16 @@ func (c *ConfigConnectorConfigImpl) SetConnectorType(connectorType string) {
c.ConnectorType = connectorType
}

// GetToken .
func (c *ConfigConnectorConfigImpl) GetToken() string {
return c.Token
}

// SetToken .
func (c *ConfigConnectorConfigImpl) SetToken(token string) {
c.Token = token
}

// Verify 检验ConfigConnector配置.
func (c *ConfigConnectorConfigImpl) Verify() error {
if nil == c {
Expand Down
14 changes: 14 additions & 0 deletions pkg/config/serverconnector.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type ServerConnectorConfigImpl struct {
ReconnectInterval *time.Duration `yaml:"reconnectInterval" json:"reconnectInterval"`

Plugin PluginConfigs `yaml:"plugin" json:"plugin"`

Token string `yaml:"token" json:"token"`
}

// GetAddresses global.serverConnector.addresses
Expand Down Expand Up @@ -152,6 +154,18 @@ func (s *ServerConnectorConfigImpl) SetPluginConfig(pluginName string, value Bas
return s.Plugin.SetPluginConfig(common.TypeServerConnector, pluginName, value)
}

// GetProtocol global.serverConnector.protocol
// 与cl5 server对接的协议.
func (s *ServerConnectorConfigImpl) GetToken() string {
return s.Token
}

// SetProtocol 设置与cl5 server对接的协议.
func (s *ServerConnectorConfigImpl) SetToken(t string) {
s.Token = t
}


Check failure on line 168 in pkg/config/serverconnector.go

View workflow job for this annotation

GitHub Actions / golangci-lint (1.20.x)

File is not `gci`-ed with --skip-generated -s standard -s default -s prefix(github.com/polarismesh/polaris-go) --custom-order (gci)

Check failure on line 168 in pkg/config/serverconnector.go

View workflow job for this annotation

GitHub Actions / golangci-lint (1.17.x)

File is not `gci`-ed with --skip-generated -s standard -s default -s prefix(github.com/polarismesh/polaris-go) --custom-order (gci)

Check failure on line 168 in pkg/config/serverconnector.go

View workflow job for this annotation

GitHub Actions / golangci-lint (1.18.x)

File is not `gci`-ed with --skip-generated -s standard -s default -s prefix(github.com/polarismesh/polaris-go) --custom-order (gci)
// Verify 检验ServerConnector配置.
func (s *ServerConnectorConfigImpl) Verify() error {
if nil == s {
Expand Down
12 changes: 6 additions & 6 deletions plugin/configconnector/polaris/config_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (c *Connector) GetConfigFile(configFile *configconnector.ConfigFile) (*conf
defer conn.Release(opKey)
configClient := config_manage.NewPolarisConfigGRPCClient(network.ToGRPCConn(conn.Conn))
reqID := connector.NextRegisterInstanceReqID()
ctx, cancel := connector.CreateHeaderContextWithReqId(0, reqID)
ctx, cancel := connector.CreateHeadersContext(0, connector.AppendHeaderWithReqId(reqID))
if cancel != nil {
defer cancel()
}
Expand Down Expand Up @@ -153,7 +153,7 @@ func (c *Connector) WatchConfigFiles(configFileList []*configconnector.ConfigFil
defer conn.Release(opKey)
configClient := config_manage.NewPolarisConfigGRPCClient(network.ToGRPCConn(conn.Conn))
reqID := connector.NextWatchConfigFilesReqID()
ctx, cancel := connector.CreateHeaderContextWithReqId(0, reqID)
ctx, cancel := connector.CreateHeadersContext(0, connector.AppendHeaderWithReqId(reqID))
if cancel != nil {
defer cancel()
}
Expand Down Expand Up @@ -190,7 +190,7 @@ func (c *Connector) CreateConfigFile(configFile *configconnector.ConfigFile) (*c
defer conn.Release(opKey)
configClient := config_manage.NewPolarisConfigGRPCClient(network.ToGRPCConn(conn.Conn))
reqID := connector.NextCreateConfigFileReqID()
ctx, cancel := connector.CreateHeaderContextWithReqId(0, reqID)
ctx, cancel := connector.CreateHeadersContext(0, connector.AppendHeaderWithReqId(reqID))
if cancel != nil {
defer cancel()
}
Expand Down Expand Up @@ -222,7 +222,7 @@ func (c *Connector) UpdateConfigFile(configFile *configconnector.ConfigFile) (*c
defer conn.Release(opKey)
configClient := config_manage.NewPolarisConfigGRPCClient(network.ToGRPCConn(conn.Conn))
reqID := connector.NextUpdateConfigFileReqID()
ctx, cancel := connector.CreateHeaderContextWithReqId(0, reqID)
ctx, cancel := connector.CreateHeadersContext(0, connector.AppendHeaderWithReqId(reqID))
if cancel != nil {
defer cancel()
}
Expand Down Expand Up @@ -254,7 +254,7 @@ func (c *Connector) PublishConfigFile(configFile *configconnector.ConfigFile) (*
defer conn.Release(opKey)
configClient := config_manage.NewPolarisConfigGRPCClient(network.ToGRPCConn(conn.Conn))
reqID := connector.NextPublishConfigFileReqID()
ctx, cancel := connector.CreateHeaderContextWithReqId(0, reqID)
ctx, cancel := connector.CreateHeadersContext(0, connector.AppendHeaderWithReqId(reqID))
if cancel != nil {
defer cancel()
}
Expand Down Expand Up @@ -283,7 +283,7 @@ func (c *Connector) GetConfigGroup(req *configconnector.ConfigGroup) (*configcon
}

reqID := connector.NextPublishConfigFileReqID()
ctx, cancel := connector.CreateHeaderContextWithReqId(0, reqID)
ctx, cancel := connector.CreateHeadersContext(0, connector.AppendHeaderWithReqId(reqID))
if cancel != nil {
defer cancel()
}
Expand Down
10 changes: 8 additions & 2 deletions plugin/serverconnector/common/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ type DiscoverClient interface {
CloseSend() error
}

type DiscoverClientCreatorArgs struct {
ReqId string
AuthToken string
Connection *network.Connection
Timeout time.Duration
}

// DiscoverClientCreator 创建client的函数
type DiscoverClientCreator func(
reqId string, connection *network.Connection, timeout time.Duration) (DiscoverClient, context.CancelFunc, error)
type DiscoverClientCreator func(args *DiscoverClientCreatorArgs) (DiscoverClient, context.CancelFunc, error)
20 changes: 17 additions & 3 deletions plugin/serverconnector/common/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ import (
const (
// 需要发往服务端的请求跟踪标识
headerRequestID = "request-id"
//
headerAuthToken = "X-Polaris-Token"
// 失败时的最大超时时间
maxConnTimeout = 100 * time.Millisecond
// 任务重试间隔
Expand All @@ -68,6 +70,8 @@ type DiscoverConnector struct {
ServiceConnector *plugin.PluginBase
connectionIdleTimeout time.Duration
messageTimeout time.Duration
// authToken
authToken string
// 普通任务队列
taskChannel chan *clientTask
// 高优先级重试任务队列,只会在系统服务未ready时候会往队列塞值
Expand Down Expand Up @@ -100,6 +104,7 @@ type clientTask struct {
// Init 初始化插件
func (g *DiscoverConnector) Init(ctx *plugin.InitContext, createClient DiscoverClientCreator) {
ctxConfig := ctx.Config
g.authToken = ctxConfig.GetGlobal().GetServerConnector().GetToken()
g.RunContext = common.NewRunContext()
g.scalableRand = rand.NewScalableRand()
g.discoverKey.Namespace = ctxConfig.GetGlobal().GetSystem().GetDiscoverCluster().GetNamespace()
Expand Down Expand Up @@ -587,8 +592,12 @@ func (g *DiscoverConnector) newStream(task *serviceUpdateTask) (streamingClient
goto finally
}
streamingClient.reqID = NextDiscoverReqID()
streamingClient.discoverClient, streamingClient.cancel, err = g.createClient(streamingClient.reqID,
streamingClient.connection, 0)
streamingClient.discoverClient, streamingClient.cancel, err = g.createClient(&DiscoverClientCreatorArgs{
ReqId: streamingClient.reqID,
Connection: streamingClient.connection,
Timeout: 0,
AuthToken: g.authToken,
})
if err != nil {
log.GetNetworkLogger().Errorf("%s, newStream: fail to get streaming client from %s, reqID %s, err %v",
g.ServiceConnector.GetSDKContextID(), streamingClient.connection, streamingClient.reqID, err)
Expand Down Expand Up @@ -953,7 +962,12 @@ func (g *DiscoverConnector) syncUpdateTask(task *serviceUpdateTask) error {
}
defer connection.Release(OpKeyDiscover)
reqID := NextDiscoverReqID()
discoverClient, cancel, err := g.createClient(reqID, connection, g.messageTimeout)
discoverClient, cancel, err := g.createClient(&DiscoverClientCreatorArgs{
ReqId: reqID,
Connection: connection,
Timeout: g.messageTimeout,
AuthToken: g.authToken,
})
if cancel != nil {
defer cancel()
}
Expand Down
31 changes: 12 additions & 19 deletions plugin/serverconnector/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,12 @@ func GetUpdateTaskRequestTime(updateTask *serviceUpdateTask) time.Duration {
// return metadata.NewOutgoingContext(ctx, md)
// }

// CreateHeaderContext 创建传输grpc头的valueContext
func CreateHeaderContext(timeout time.Duration, headers map[string]string) (context.Context, context.CancelFunc) {
func CreateHeadersContext(timeout time.Duration, options ...func(map[string]string)) (context.Context, context.CancelFunc) {
headers := map[string]string{}
for _, option := range options {
option(headers)
}

md := metadata.New(headers)
var ctx context.Context
var cancel context.CancelFunc
Expand All @@ -188,25 +192,14 @@ func CreateHeaderContext(timeout time.Duration, headers map[string]string) (cont
return metadata.NewOutgoingContext(ctx, md), cancel
}

// CreateHeaderContextWithReqId 创建传输grpc头的valueContext
func CreateHeaderContextWithReqId(timeout time.Duration, reqID string) (context.Context, context.CancelFunc) {
md := metadata.New(map[string]string{headerRequestID: reqID})
var ctx context.Context
var cancel context.CancelFunc
if timeout > 0 {
ctx, cancel = context.WithTimeout(context.Background(), timeout)
} else {
ctx = context.Background()
cancel = nil
func AppendAuthHeader(token string) func(map[string]string) {
return func(header map[string]string) {
header[headerAuthToken] = token
}
return metadata.NewOutgoingContext(ctx, md), cancel
}

func AppendHeaderWithReqId(header map[string]string, reqID string) map[string]string {
m := make(map[string]string, len(header)+1)
for k, v := range header {
m[k] = v
func AppendHeaderWithReqId(reqID string) func(map[string]string) {
return func(header map[string]string) {
header[headerRequestID] = reqID
}
m[headerRequestID] = reqID
return m
}
10 changes: 6 additions & 4 deletions plugin/serverconnector/grpc/operation_async.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,13 @@ func (g *Connector) GetConnectionManager() network.ConnectionManager {
}

// 创建服务发现客户端
func (g *Connector) createDiscoverClient(reqID string,
connection *network.Connection, timeout time.Duration) (connector.DiscoverClient, context.CancelFunc, error) {
func (g *Connector) createDiscoverClient(args *connector.DiscoverClientCreatorArgs) (connector.DiscoverClient, context.CancelFunc, error) {
// 创建namingClient对象
client := apiservice.NewPolarisGRPCClient(network.ToGRPCConn(connection.Conn))
outgoingCtx, cancel := connector.CreateHeaderContextWithReqId(timeout, reqID)
client := apiservice.NewPolarisGRPCClient(network.ToGRPCConn(args.Connection.Conn))
outgoingCtx, cancel := connector.CreateHeadersContext(args.Timeout,
connector.AppendAuthHeader(args.AuthToken),
connector.AppendHeaderWithReqId(args.ReqId))

discoverClient, err := client.Discover(outgoingCtx)
return discoverClient, cancel, err
}
Expand Down
8 changes: 4 additions & 4 deletions plugin/serverconnector/grpc/operation_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (g *Connector) RegisterInstance(req *model.InstanceRegisterRequest, header
var (
namingClient = apiservice.NewPolarisGRPCClient(network.ToGRPCConn(conn.Conn))
reqID = connector.NextRegisterInstanceReqID()
ctx, cancel = connector.CreateHeaderContext(*req.Timeout, connector.AppendHeaderWithReqId(header, reqID))
ctx, cancel = connector.CreateHeadersContext(*req.Timeout, connector.AppendHeaderWithReqId(reqID))
)

if cancel != nil {
Expand Down Expand Up @@ -120,7 +120,7 @@ func (g *Connector) DeregisterInstance(req *model.InstanceDeRegisterRequest) err
var (
namingClient = apiservice.NewPolarisGRPCClient(network.ToGRPCConn(conn.Conn))
reqID = connector.NextDeRegisterInstanceReqID()
ctx, cancel = connector.CreateHeaderContextWithReqId(*req.Timeout, reqID)
ctx, cancel = connector.CreateHeadersContext(*req.Timeout, connector.AppendHeaderWithReqId(reqID))
)
if cancel != nil {
defer cancel()
Expand Down Expand Up @@ -181,7 +181,7 @@ func (g *Connector) Heartbeat(req *model.InstanceHeartbeatRequest) error {
var (
namingClient = apiservice.NewPolarisGRPCClient(network.ToGRPCConn(conn.Conn))
reqID = connector.NextHeartbeatReqID()
ctx, cancel = connector.CreateHeaderContextWithReqId(*req.Timeout, reqID)
ctx, cancel = connector.CreateHeadersContext(*req.Timeout, connector.AppendHeaderWithReqId(reqID))
)
if cancel != nil {
defer cancel()
Expand Down Expand Up @@ -269,7 +269,7 @@ func (g *Connector) ReportClient(req *model.ReportClientRequest) (*model.ReportC
var (
namingClient = apiservice.NewPolarisGRPCClient(network.ToGRPCConn(conn.Conn))
reqID = connector.NextReportClientReqID()
ctx, cancel = connector.CreateHeaderContextWithReqId(req.Timeout, reqID)
ctx, cancel = connector.CreateHeadersContext(req.Timeout, connector.AppendHeaderWithReqId(reqID))
)
if cancel != nil {
defer cancel()
Expand Down

0 comments on commit 60ecfda

Please sign in to comment.