Skip to content

Commit

Permalink
Remove the code of Local TSO from PD client
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed Nov 13, 2024
1 parent f941078 commit c8870a6
Show file tree
Hide file tree
Showing 13 changed files with 168 additions and 644 deletions.
29 changes: 9 additions & 20 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,26 +714,25 @@ func (c *client) getRegionAPIClientAndContext(ctx context.Context, allowFollower

// GetTSAsync implements the TSOClient interface.
func (c *client) GetTSAsync(ctx context.Context) TSFuture {
return c.GetLocalTSAsync(ctx, globalDCLocation)
}

// GetLocalTSAsync implements the TSOClient interface.
func (c *client) GetLocalTSAsync(ctx context.Context, dcLocation string) TSFuture {
defer trace.StartRegion(ctx, "pdclient.GetLocalTSAsync").End()
defer trace.StartRegion(ctx, "pdclient.GetTSAsync").End()
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetLocalTSAsync", opentracing.ChildOf(span.Context()))
span = span.Tracer().StartSpan("pdclient.GetTSAsync", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
return c.dispatchTSORequestWithRetry(ctx)
}

return c.dispatchTSORequestWithRetry(ctx, dcLocation)
// GetLocalTSAsync implements the TSOClient interface.
func (c *client) GetLocalTSAsync(ctx context.Context, _ string) TSFuture {
return c.GetTSAsync(ctx)
}

const (
dispatchRetryDelay = 50 * time.Millisecond
dispatchRetryCount = 2
)

func (c *client) dispatchTSORequestWithRetry(ctx context.Context, dcLocation string) TSFuture {
func (c *client) dispatchTSORequestWithRetry(ctx context.Context) TSFuture {
var (
retryable bool
err error
Expand All @@ -752,7 +751,7 @@ func (c *client) dispatchTSORequestWithRetry(ctx context.Context, dcLocation str
}
// Get a new request from the pool if it's nil or not from the current pool.
if req == nil || req.pool != tsoClient.tsoReqPool {
req = tsoClient.getTSORequest(ctx, dcLocation)
req = tsoClient.getTSORequest(ctx)
}
retryable, err = tsoClient.dispatchRequest(req)
if !retryable {
Expand Down Expand Up @@ -1600,13 +1599,3 @@ func (c *client) respForErr(observer prometheus.Observer, start time.Time, err e
}
return nil
}

// GetTSOAllocators returns {dc-location -> TSO allocator leader URL} connection map
// For test only.
func (c *client) GetTSOAllocators() *sync.Map {
tsoClient := c.getTSOClient()
if tsoClient == nil {
return nil
}
return tsoClient.GetTSOAllocators()
}
89 changes: 17 additions & 72 deletions client/pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
)

const (
globalDCLocation = "global"
memberUpdateInterval = time.Minute
serviceModeUpdateInterval = 3 * time.Second
updateMemberTimeout = time.Second // Use a shorter timeout to recover faster from network isolation.
Expand Down Expand Up @@ -383,21 +382,16 @@ func (c *pdServiceBalancer) get() (ret ServiceClient) {
}

type updateKeyspaceIDFunc func() error
type tsoLocalServURLsUpdatedFunc func(map[string]string) error
type tsoGlobalServURLUpdatedFunc func(string) error
type tsoLeaderURLUpdatedFunc func(string) error

type tsoAllocatorEventSource interface {
// SetTSOLocalServURLsUpdatedCallback adds a callback which will be called when the local tso
// allocator leader list is updated.
SetTSOLocalServURLsUpdatedCallback(callback tsoLocalServURLsUpdatedFunc)
// SetTSOGlobalServURLUpdatedCallback adds a callback which will be called when the global tso
// allocator leader is updated.
SetTSOGlobalServURLUpdatedCallback(callback tsoGlobalServURLUpdatedFunc)
type tsoEventSource interface {
// SetTSOLeaderURLUpdatedCallback adds a callback which will be called when the TSO leader is updated.
SetTSOLeaderURLUpdatedCallback(callback tsoLeaderURLUpdatedFunc)
}

var (
_ ServiceDiscovery = (*pdServiceDiscovery)(nil)
_ tsoAllocatorEventSource = (*pdServiceDiscovery)(nil)
_ ServiceDiscovery = (*pdServiceDiscovery)(nil)
_ tsoEventSource = (*pdServiceDiscovery)(nil)
)

// pdServiceDiscovery is the service discovery client of PD/API service which is quorum based
Expand Down Expand Up @@ -426,12 +420,8 @@ type pdServiceDiscovery struct {
// membersChangedCbs will be called after there is any membership change in the
// leader and followers
membersChangedCbs []func()
// tsoLocalAllocLeadersUpdatedCb will be called when the local tso allocator
// leader list is updated. The input is a map {DC Location -> Leader URL}
tsoLocalAllocLeadersUpdatedCb tsoLocalServURLsUpdatedFunc
// tsoGlobalAllocLeaderUpdatedCb will be called when the global tso allocator
// leader is updated.
tsoGlobalAllocLeaderUpdatedCb tsoGlobalServURLUpdatedFunc
// tsoLeaderUpdatedCb will be called when the TSO leader is updated.
tsoLeaderUpdatedCb tsoLeaderURLUpdatedFunc

checkMembershipCh chan struct{}

Expand Down Expand Up @@ -801,22 +791,15 @@ func (c *pdServiceDiscovery) AddServiceURLsSwitchedCallback(callbacks ...func())
c.membersChangedCbs = append(c.membersChangedCbs, callbacks...)
}

// SetTSOLocalServURLsUpdatedCallback adds a callback which will be called when the local tso
// allocator leader list is updated.
func (c *pdServiceDiscovery) SetTSOLocalServURLsUpdatedCallback(callback tsoLocalServURLsUpdatedFunc) {
c.tsoLocalAllocLeadersUpdatedCb = callback
}

// SetTSOGlobalServURLUpdatedCallback adds a callback which will be called when the global tso
// allocator leader is updated.
func (c *pdServiceDiscovery) SetTSOGlobalServURLUpdatedCallback(callback tsoGlobalServURLUpdatedFunc) {
// SetTSOLeaderURLUpdatedCallback adds a callback which will be called when the TSO leader is updated.
func (c *pdServiceDiscovery) SetTSOLeaderURLUpdatedCallback(callback tsoLeaderURLUpdatedFunc) {
url := c.getLeaderURL()
if len(url) > 0 {
if err := callback(url); err != nil {
log.Error("[tso] failed to call back when tso global service url update", zap.String("url", url), errs.ZapError(err))
log.Error("[tso] failed to call back when tso leader url update", zap.String("url", url), errs.ZapError(err))
}
}
c.tsoGlobalAllocLeaderUpdatedCb = callback
c.tsoLeaderUpdatedCb = callback
}

// getLeaderURL returns the leader URL.
Expand Down Expand Up @@ -904,16 +887,9 @@ func (c *pdServiceDiscovery) updateMember() error {
if err == nil && members.GetHeader().GetClusterId() != c.clusterID {
err = errs.ErrClientUpdateMember.FastGenByArgs("cluster id does not match")
}
// Check the TSO Allocator Leader.
var errTSO error
if err == nil {
if members.GetLeader() == nil || len(members.GetLeader().GetClientUrls()) == 0 {
err = errs.ErrClientGetLeader.FastGenByArgs("leader url doesn't exist")
}
// Still need to update TsoAllocatorLeaders, even if there is no PD leader
errTSO = c.switchTSOAllocatorLeaders(members.GetTsoAllocatorLeaders())
if err == nil && (members.GetLeader() == nil || len(members.GetLeader().GetClientUrls()) == 0) {
err = errs.ErrClientGetLeader.FastGenByArgs("leader url doesn't exist")
}

// Failed to get members
if err != nil {
log.Info("[pd] cannot update member from this url",
Expand All @@ -926,15 +902,9 @@ func (c *pdServiceDiscovery) updateMember() error {
continue
}
}

c.updateURLs(members.GetMembers())
if err := c.updateServiceClient(members.GetMembers(), members.GetLeader()); err != nil {
return err
}

// If `switchLeader` succeeds but `switchTSOAllocatorLeader` has an error,
// the error of `switchTSOAllocatorLeader` will be returned.
return errTSO
return c.updateServiceClient(members.GetMembers(), members.GetLeader())
}
return errs.ErrClientGetMember.FastGenByArgs()
}
Expand Down Expand Up @@ -1009,13 +979,12 @@ func (c *pdServiceDiscovery) switchLeader(url string) (bool, error) {
newConn, err := c.GetOrCreateGRPCConn(url)
// If gRPC connect is created successfully or leader is new, still saves.
if url != oldLeader.GetURL() || newConn != nil {
// Set PD leader and Global TSO Allocator (which is also the PD leader)
leaderClient := newPDServiceClient(url, url, newConn, true)
c.leader.Store(leaderClient)
}
// Run callbacks
if c.tsoGlobalAllocLeaderUpdatedCb != nil {
if err := c.tsoGlobalAllocLeaderUpdatedCb(url); err != nil {
if c.tsoLeaderUpdatedCb != nil {
if err := c.tsoLeaderUpdatedCb(url); err != nil {
return true, err
}
}
Expand Down Expand Up @@ -1102,30 +1071,6 @@ func (c *pdServiceDiscovery) updateServiceClient(members []*pdpb.Member, leader
return err
}

func (c *pdServiceDiscovery) switchTSOAllocatorLeaders(allocatorMap map[string]*pdpb.Member) error {
if len(allocatorMap) == 0 {
return nil
}

allocMap := make(map[string]string)
// Switch to the new one
for dcLocation, member := range allocatorMap {
if len(member.GetClientUrls()) == 0 {
continue
}
allocMap[dcLocation] = member.GetClientUrls()[0]
}

// Run the callback to reflect any possible change in the local tso allocators.
if c.tsoLocalAllocLeadersUpdatedCb != nil {
if err := c.tsoLocalAllocLeadersUpdatedCb(allocMap); err != nil {
return err
}
}

return nil
}

// GetOrCreateGRPCConn returns the corresponding grpc client connection of the given URL.
func (c *pdServiceDiscovery) GetOrCreateGRPCConn(url string) (*grpc.ClientConn, error) {
return grpcutil.GetOrCreateGRPCConn(c.ctx, &c.clientConns, url, c.tlsCfg, c.option.gRPCDialOptions...)
Expand Down
Loading

0 comments on commit c8870a6

Please sign in to comment.