Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: remove the code of Local TSO from PD client #8803

Merged
merged 3 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 18 additions & 25 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/utils/tlsutil"
"github.com/tikv/pd/client/utils/tsoutil"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -714,26 +713,28 @@

// GetTSAsync implements the TSOClient interface.
func (c *client) GetTSAsync(ctx context.Context) TSFuture {
return c.GetLocalTSAsync(ctx, globalDCLocation)
}

// GetLocalTSAsync implements the TSOClient interface.
func (c *client) GetLocalTSAsync(ctx context.Context, dcLocation string) TSFuture {
defer trace.StartRegion(ctx, "pdclient.GetLocalTSAsync").End()
defer trace.StartRegion(ctx, "pdclient.GetTSAsync").End()
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetLocalTSAsync", opentracing.ChildOf(span.Context()))
span = span.Tracer().StartSpan("pdclient.GetTSAsync", opentracing.ChildOf(span.Context()))

Check warning on line 718 in client/client.go

View check run for this annotation

Codecov / codecov/patch

client/client.go#L718

Added line #L718 was not covered by tests
defer span.Finish()
}
return c.dispatchTSORequestWithRetry(ctx)
}

return c.dispatchTSORequestWithRetry(ctx, dcLocation)
// GetLocalTSAsync implements the TSOClient interface.
//
// Deprecated: Local TSO will be completely removed in the future. Currently, regardless of the
// parameters passed in, this method will default to returning the global TSO.
func (c *client) GetLocalTSAsync(ctx context.Context, _ string) TSFuture {
return c.GetTSAsync(ctx)

Check warning on line 729 in client/client.go

View check run for this annotation

Codecov / codecov/patch

client/client.go#L728-L729

Added lines #L728 - L729 were not covered by tests
}

const (
dispatchRetryDelay = 50 * time.Millisecond
dispatchRetryCount = 2
)

func (c *client) dispatchTSORequestWithRetry(ctx context.Context, dcLocation string) TSFuture {
func (c *client) dispatchTSORequestWithRetry(ctx context.Context) TSFuture {
var (
retryable bool
err error
Expand All @@ -752,7 +753,7 @@
}
// Get a new request from the pool if it's nil or not from the current pool.
if req == nil || req.pool != tsoClient.tsoReqPool {
req = tsoClient.getTSORequest(ctx, dcLocation)
req = tsoClient.getTSORequest(ctx)
}
retryable, err = tsoClient.dispatchRequest(req)
if !retryable {
Expand All @@ -775,9 +776,11 @@
}

// GetLocalTS implements the TSOClient interface.
func (c *client) GetLocalTS(ctx context.Context, dcLocation string) (physical int64, logical int64, err error) {
resp := c.GetLocalTSAsync(ctx, dcLocation)
return resp.Wait()
//
// Deprecated: Local TSO will be completely removed in the future. Currently, regardless of the
// parameters passed in, this method will default to returning the global TSO.
func (c *client) GetLocalTS(ctx context.Context, _ string) (physical int64, logical int64, err error) {
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
return c.GetTS(ctx)

Check warning on line 783 in client/client.go

View check run for this annotation

Codecov / codecov/patch

client/client.go#L782-L783

Added lines #L782 - L783 were not covered by tests
}

// GetMinTS implements the TSOClient interface.
Expand Down Expand Up @@ -823,7 +826,7 @@
}

minTS := resp.GetTimestamp()
return minTS.Physical, tsoutil.AddLogical(minTS.Logical, 0, minTS.SuffixBits), nil
return minTS.Physical, minTS.Logical, nil
}

func handleRegionResponse(res *pdpb.GetRegionResponse) *Region {
Expand Down Expand Up @@ -1600,13 +1603,3 @@
}
return nil
}

// GetTSOAllocators returns {dc-location -> TSO allocator leader URL} connection map
// For test only.
func (c *client) GetTSOAllocators() *sync.Map {
tsoClient := c.getTSOClient()
if tsoClient == nil {
return nil
}
return tsoClient.GetTSOAllocators()
}
99 changes: 25 additions & 74 deletions client/pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import (
"context"
"crypto/tls"
"fmt"
"net/url"
"reflect"
"sort"
Expand All @@ -40,7 +41,6 @@
)

const (
globalDCLocation = "global"
memberUpdateInterval = time.Minute
serviceModeUpdateInterval = 3 * time.Second
updateMemberTimeout = time.Second // Use a shorter timeout to recover faster from network isolation.
Expand Down Expand Up @@ -383,21 +383,17 @@
}

type updateKeyspaceIDFunc func() error
type tsoLocalServURLsUpdatedFunc func(map[string]string) error
type tsoGlobalServURLUpdatedFunc func(string) error
type tsoLeaderURLUpdatedFunc func(string) error

type tsoAllocatorEventSource interface {
// SetTSOLocalServURLsUpdatedCallback adds a callback which will be called when the local tso
// allocator leader list is updated.
SetTSOLocalServURLsUpdatedCallback(callback tsoLocalServURLsUpdatedFunc)
// SetTSOGlobalServURLUpdatedCallback adds a callback which will be called when the global tso
// allocator leader is updated.
SetTSOGlobalServURLUpdatedCallback(callback tsoGlobalServURLUpdatedFunc)
// tsoEventSource subscribes to events related to changes in the TSO leader/primary from the service discovery.
type tsoEventSource interface {
// SetTSOLeaderURLUpdatedCallback adds a callback which will be called when the TSO leader/primary is updated.
SetTSOLeaderURLUpdatedCallback(callback tsoLeaderURLUpdatedFunc)
}

var (
_ ServiceDiscovery = (*pdServiceDiscovery)(nil)
_ tsoAllocatorEventSource = (*pdServiceDiscovery)(nil)
_ ServiceDiscovery = (*pdServiceDiscovery)(nil)
_ tsoEventSource = (*pdServiceDiscovery)(nil)
)

// pdServiceDiscovery is the service discovery client of PD/API service which is quorum based
Expand Down Expand Up @@ -426,12 +422,8 @@
// membersChangedCbs will be called after there is any membership change in the
// leader and followers
membersChangedCbs []func()
// tsoLocalAllocLeadersUpdatedCb will be called when the local tso allocator
// leader list is updated. The input is a map {DC Location -> Leader URL}
tsoLocalAllocLeadersUpdatedCb tsoLocalServURLsUpdatedFunc
// tsoGlobalAllocLeaderUpdatedCb will be called when the global tso allocator
// leader is updated.
tsoGlobalAllocLeaderUpdatedCb tsoGlobalServURLUpdatedFunc
// tsoLeaderUpdatedCb will be called when the TSO leader is updated.
tsoLeaderUpdatedCb tsoLeaderURLUpdatedFunc

checkMembershipCh chan struct{}

Expand Down Expand Up @@ -801,22 +793,15 @@
c.membersChangedCbs = append(c.membersChangedCbs, callbacks...)
}

// SetTSOLocalServURLsUpdatedCallback adds a callback which will be called when the local tso
// allocator leader list is updated.
func (c *pdServiceDiscovery) SetTSOLocalServURLsUpdatedCallback(callback tsoLocalServURLsUpdatedFunc) {
c.tsoLocalAllocLeadersUpdatedCb = callback
}

// SetTSOGlobalServURLUpdatedCallback adds a callback which will be called when the global tso
// allocator leader is updated.
func (c *pdServiceDiscovery) SetTSOGlobalServURLUpdatedCallback(callback tsoGlobalServURLUpdatedFunc) {
// SetTSOLeaderURLUpdatedCallback adds a callback which will be called when the TSO leader is updated.
func (c *pdServiceDiscovery) SetTSOLeaderURLUpdatedCallback(callback tsoLeaderURLUpdatedFunc) {
url := c.getLeaderURL()
if len(url) > 0 {
if err := callback(url); err != nil {
log.Error("[tso] failed to call back when tso global service url update", zap.String("url", url), errs.ZapError(err))
log.Error("[tso] failed to call back when tso leader url update", zap.String("url", url), errs.ZapError(err))

Check warning on line 801 in client/pd_service_discovery.go

View check run for this annotation

Codecov / codecov/patch

client/pd_service_discovery.go#L801

Added line #L801 was not covered by tests
}
}
c.tsoGlobalAllocLeaderUpdatedCb = callback
c.tsoLeaderUpdatedCb = callback
}

// getLeaderURL returns the leader URL.
Expand Down Expand Up @@ -901,19 +886,16 @@

members, err := c.getMembers(c.ctx, url, updateMemberTimeout)
// Check the cluster ID.
if err == nil && members.GetHeader().GetClusterId() != c.clusterID {
err = errs.ErrClientUpdateMember.FastGenByArgs("cluster id does not match")
updatedClusterID := members.GetHeader().GetClusterId()
if err == nil && updatedClusterID != c.clusterID {
log.Warn("[pd] cluster id does not match",
zap.Uint64("updated-cluster-id", updatedClusterID),
zap.Uint64("expected-cluster-id", c.clusterID))
err = errs.ErrClientUpdateMember.FastGenByArgs(fmt.Sprintf("cluster id does not match: %d != %d", updatedClusterID, c.clusterID))

Check warning on line 894 in client/pd_service_discovery.go

View check run for this annotation

Codecov / codecov/patch

client/pd_service_discovery.go#L891-L894

Added lines #L891 - L894 were not covered by tests
}
// Check the TSO Allocator Leader.
var errTSO error
if err == nil {
if members.GetLeader() == nil || len(members.GetLeader().GetClientUrls()) == 0 {
err = errs.ErrClientGetLeader.FastGenByArgs("leader url doesn't exist")
}
// Still need to update TsoAllocatorLeaders, even if there is no PD leader
errTSO = c.switchTSOAllocatorLeaders(members.GetTsoAllocatorLeaders())
if err == nil && (members.GetLeader() == nil || len(members.GetLeader().GetClientUrls()) == 0) {
err = errs.ErrClientGetLeader.FastGenByArgs("leader url doesn't exist")
}

// Failed to get members
if err != nil {
log.Info("[pd] cannot update member from this url",
Expand All @@ -926,15 +908,9 @@
continue
}
}

c.updateURLs(members.GetMembers())
if err := c.updateServiceClient(members.GetMembers(), members.GetLeader()); err != nil {
return err
}

// If `switchLeader` succeeds but `switchTSOAllocatorLeader` has an error,
// the error of `switchTSOAllocatorLeader` will be returned.
return errTSO
return c.updateServiceClient(members.GetMembers(), members.GetLeader())
}
return errs.ErrClientGetMember.FastGenByArgs()
}
Expand Down Expand Up @@ -1009,13 +985,12 @@
newConn, err := c.GetOrCreateGRPCConn(url)
// If gRPC connect is created successfully or leader is new, still saves.
if url != oldLeader.GetURL() || newConn != nil {
// Set PD leader and Global TSO Allocator (which is also the PD leader)
leaderClient := newPDServiceClient(url, url, newConn, true)
c.leader.Store(leaderClient)
}
// Run callbacks
if c.tsoGlobalAllocLeaderUpdatedCb != nil {
if err := c.tsoGlobalAllocLeaderUpdatedCb(url); err != nil {
if c.tsoLeaderUpdatedCb != nil {
if err := c.tsoLeaderUpdatedCb(url); err != nil {
return true, err
}
}
Expand Down Expand Up @@ -1102,30 +1077,6 @@
return err
}

func (c *pdServiceDiscovery) switchTSOAllocatorLeaders(allocatorMap map[string]*pdpb.Member) error {
if len(allocatorMap) == 0 {
return nil
}

allocMap := make(map[string]string)
// Switch to the new one
for dcLocation, member := range allocatorMap {
if len(member.GetClientUrls()) == 0 {
continue
}
allocMap[dcLocation] = member.GetClientUrls()[0]
}

// Run the callback to reflect any possible change in the local tso allocators.
if c.tsoLocalAllocLeadersUpdatedCb != nil {
if err := c.tsoLocalAllocLeadersUpdatedCb(allocMap); err != nil {
return err
}
}

return nil
}

// GetOrCreateGRPCConn returns the corresponding grpc client connection of the given URL.
func (c *pdServiceDiscovery) GetOrCreateGRPCConn(url string) (*grpc.ClientConn, error) {
return grpcutil.GetOrCreateGRPCConn(c.ctx, &c.clientConns, url, c.tlsCfg, c.option.gRPCDialOptions...)
Expand Down
Loading