diff --git a/client/client.go b/client/client.go index e1607051a34..0c2ccbd3805 100644 --- a/client/client.go +++ b/client/client.go @@ -714,18 +714,17 @@ func (c *client) getRegionAPIClientAndContext(ctx context.Context, allowFollower // GetTSAsync implements the TSOClient interface. func (c *client) GetTSAsync(ctx context.Context) TSFuture { - return c.GetLocalTSAsync(ctx, globalDCLocation) -} - -// GetLocalTSAsync implements the TSOClient interface. -func (c *client) GetLocalTSAsync(ctx context.Context, dcLocation string) TSFuture { - defer trace.StartRegion(ctx, "pdclient.GetLocalTSAsync").End() + defer trace.StartRegion(ctx, "pdclient.GetTSAsync").End() if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span = span.Tracer().StartSpan("pdclient.GetLocalTSAsync", opentracing.ChildOf(span.Context())) + span = span.Tracer().StartSpan("pdclient.GetTSAsync", opentracing.ChildOf(span.Context())) defer span.Finish() } + return c.dispatchTSORequestWithRetry(ctx) +} - return c.dispatchTSORequestWithRetry(ctx, dcLocation) +// GetLocalTSAsync implements the TSOClient interface. +func (c *client) GetLocalTSAsync(ctx context.Context, _ string) TSFuture { + return c.GetTSAsync(ctx) } const ( @@ -733,7 +732,7 @@ const ( dispatchRetryCount = 2 ) -func (c *client) dispatchTSORequestWithRetry(ctx context.Context, dcLocation string) TSFuture { +func (c *client) dispatchTSORequestWithRetry(ctx context.Context) TSFuture { var ( retryable bool err error @@ -752,7 +751,7 @@ func (c *client) dispatchTSORequestWithRetry(ctx context.Context, dcLocation str } // Get a new request from the pool if it's nil or not from the current pool. if req == nil || req.pool != tsoClient.tsoReqPool { - req = tsoClient.getTSORequest(ctx, dcLocation) + req = tsoClient.getTSORequest(ctx) } retryable, err = tsoClient.dispatchRequest(req) if !retryable { @@ -1600,13 +1599,3 @@ func (c *client) respForErr(observer prometheus.Observer, start time.Time, err e } return nil } - -// GetTSOAllocators returns {dc-location -> TSO allocator leader URL} connection map -// For test only. -func (c *client) GetTSOAllocators() *sync.Map { - tsoClient := c.getTSOClient() - if tsoClient == nil { - return nil - } - return tsoClient.GetTSOAllocators() -} diff --git a/client/pd_service_discovery.go b/client/pd_service_discovery.go index 94c08f5c10d..c332ee7066b 100644 --- a/client/pd_service_discovery.go +++ b/client/pd_service_discovery.go @@ -40,7 +40,6 @@ import ( ) const ( - globalDCLocation = "global" memberUpdateInterval = time.Minute serviceModeUpdateInterval = 3 * time.Second updateMemberTimeout = time.Second // Use a shorter timeout to recover faster from network isolation. @@ -383,21 +382,16 @@ func (c *pdServiceBalancer) get() (ret ServiceClient) { } type updateKeyspaceIDFunc func() error -type tsoLocalServURLsUpdatedFunc func(map[string]string) error -type tsoGlobalServURLUpdatedFunc func(string) error +type tsoLeaderURLUpdatedFunc func(string) error -type tsoAllocatorEventSource interface { - // SetTSOLocalServURLsUpdatedCallback adds a callback which will be called when the local tso - // allocator leader list is updated. - SetTSOLocalServURLsUpdatedCallback(callback tsoLocalServURLsUpdatedFunc) - // SetTSOGlobalServURLUpdatedCallback adds a callback which will be called when the global tso - // allocator leader is updated. - SetTSOGlobalServURLUpdatedCallback(callback tsoGlobalServURLUpdatedFunc) +type tsoEventSource interface { + // SetTSOLeaderURLUpdatedCallback adds a callback which will be called when the TSO leader is updated. + SetTSOLeaderURLUpdatedCallback(callback tsoLeaderURLUpdatedFunc) } var ( - _ ServiceDiscovery = (*pdServiceDiscovery)(nil) - _ tsoAllocatorEventSource = (*pdServiceDiscovery)(nil) + _ ServiceDiscovery = (*pdServiceDiscovery)(nil) + _ tsoEventSource = (*pdServiceDiscovery)(nil) ) // pdServiceDiscovery is the service discovery client of PD/API service which is quorum based @@ -426,12 +420,8 @@ type pdServiceDiscovery struct { // membersChangedCbs will be called after there is any membership change in the // leader and followers membersChangedCbs []func() - // tsoLocalAllocLeadersUpdatedCb will be called when the local tso allocator - // leader list is updated. The input is a map {DC Location -> Leader URL} - tsoLocalAllocLeadersUpdatedCb tsoLocalServURLsUpdatedFunc - // tsoGlobalAllocLeaderUpdatedCb will be called when the global tso allocator - // leader is updated. - tsoGlobalAllocLeaderUpdatedCb tsoGlobalServURLUpdatedFunc + // tsoLeaderUpdatedCb will be called when the TSO leader is updated. + tsoLeaderUpdatedCb tsoLeaderURLUpdatedFunc checkMembershipCh chan struct{} @@ -801,22 +791,15 @@ func (c *pdServiceDiscovery) AddServiceURLsSwitchedCallback(callbacks ...func()) c.membersChangedCbs = append(c.membersChangedCbs, callbacks...) } -// SetTSOLocalServURLsUpdatedCallback adds a callback which will be called when the local tso -// allocator leader list is updated. -func (c *pdServiceDiscovery) SetTSOLocalServURLsUpdatedCallback(callback tsoLocalServURLsUpdatedFunc) { - c.tsoLocalAllocLeadersUpdatedCb = callback -} - -// SetTSOGlobalServURLUpdatedCallback adds a callback which will be called when the global tso -// allocator leader is updated. -func (c *pdServiceDiscovery) SetTSOGlobalServURLUpdatedCallback(callback tsoGlobalServURLUpdatedFunc) { +// SetTSOLeaderURLUpdatedCallback adds a callback which will be called when the TSO leader is updated. +func (c *pdServiceDiscovery) SetTSOLeaderURLUpdatedCallback(callback tsoLeaderURLUpdatedFunc) { url := c.getLeaderURL() if len(url) > 0 { if err := callback(url); err != nil { - log.Error("[tso] failed to call back when tso global service url update", zap.String("url", url), errs.ZapError(err)) + log.Error("[tso] failed to call back when tso leader url update", zap.String("url", url), errs.ZapError(err)) } } - c.tsoGlobalAllocLeaderUpdatedCb = callback + c.tsoLeaderUpdatedCb = callback } // getLeaderURL returns the leader URL. @@ -904,16 +887,9 @@ func (c *pdServiceDiscovery) updateMember() error { if err == nil && members.GetHeader().GetClusterId() != c.clusterID { err = errs.ErrClientUpdateMember.FastGenByArgs("cluster id does not match") } - // Check the TSO Allocator Leader. - var errTSO error - if err == nil { - if members.GetLeader() == nil || len(members.GetLeader().GetClientUrls()) == 0 { - err = errs.ErrClientGetLeader.FastGenByArgs("leader url doesn't exist") - } - // Still need to update TsoAllocatorLeaders, even if there is no PD leader - errTSO = c.switchTSOAllocatorLeaders(members.GetTsoAllocatorLeaders()) + if err == nil && (members.GetLeader() == nil || len(members.GetLeader().GetClientUrls()) == 0) { + err = errs.ErrClientGetLeader.FastGenByArgs("leader url doesn't exist") } - // Failed to get members if err != nil { log.Info("[pd] cannot update member from this url", @@ -926,15 +902,9 @@ func (c *pdServiceDiscovery) updateMember() error { continue } } - c.updateURLs(members.GetMembers()) - if err := c.updateServiceClient(members.GetMembers(), members.GetLeader()); err != nil { - return err - } - // If `switchLeader` succeeds but `switchTSOAllocatorLeader` has an error, - // the error of `switchTSOAllocatorLeader` will be returned. - return errTSO + return c.updateServiceClient(members.GetMembers(), members.GetLeader()) } return errs.ErrClientGetMember.FastGenByArgs() } @@ -1009,13 +979,12 @@ func (c *pdServiceDiscovery) switchLeader(url string) (bool, error) { newConn, err := c.GetOrCreateGRPCConn(url) // If gRPC connect is created successfully or leader is new, still saves. if url != oldLeader.GetURL() || newConn != nil { - // Set PD leader and Global TSO Allocator (which is also the PD leader) leaderClient := newPDServiceClient(url, url, newConn, true) c.leader.Store(leaderClient) } // Run callbacks - if c.tsoGlobalAllocLeaderUpdatedCb != nil { - if err := c.tsoGlobalAllocLeaderUpdatedCb(url); err != nil { + if c.tsoLeaderUpdatedCb != nil { + if err := c.tsoLeaderUpdatedCb(url); err != nil { return true, err } } @@ -1102,30 +1071,6 @@ func (c *pdServiceDiscovery) updateServiceClient(members []*pdpb.Member, leader return err } -func (c *pdServiceDiscovery) switchTSOAllocatorLeaders(allocatorMap map[string]*pdpb.Member) error { - if len(allocatorMap) == 0 { - return nil - } - - allocMap := make(map[string]string) - // Switch to the new one - for dcLocation, member := range allocatorMap { - if len(member.GetClientUrls()) == 0 { - continue - } - allocMap[dcLocation] = member.GetClientUrls()[0] - } - - // Run the callback to reflect any possible change in the local tso allocators. - if c.tsoLocalAllocLeadersUpdatedCb != nil { - if err := c.tsoLocalAllocLeadersUpdatedCb(allocMap); err != nil { - return err - } - } - - return nil -} - // GetOrCreateGRPCConn returns the corresponding grpc client connection of the given URL. func (c *pdServiceDiscovery) GetOrCreateGRPCConn(url string) (*grpc.ClientConn, error) { return grpcutil.GetOrCreateGRPCConn(c.ctx, &c.clientConns, url, c.tlsCfg, c.option.gRPCDialOptions...) diff --git a/client/tso_client.go b/client/tso_client.go index d52a221c542..0cbc6280143 100644 --- a/client/tso_client.go +++ b/client/tso_client.go @@ -16,10 +16,10 @@ package pd import ( "context" - "fmt" "math/rand" "runtime/trace" "sync" + "sync/atomic" "time" "github.com/pingcap/errors" @@ -35,7 +35,6 @@ import ( ) const ( - tsoDispatcherCheckInterval = time.Minute // defaultMaxTSOBatchSize is the default max size of the TSO request batch. defaultMaxTSOBatchSize = 10000 // retryInterval and maxRetryTimes are used to control the retry interval and max retry times. @@ -49,13 +48,16 @@ type TSOClient interface { GetTS(ctx context.Context) (int64, int64, error) // GetTSAsync gets a timestamp from PD or TSO microservice, without block the caller. GetTSAsync(ctx context.Context) TSFuture - // GetLocalTS gets a local timestamp from PD or TSO microservice. - GetLocalTS(ctx context.Context, dcLocation string) (int64, int64, error) - // GetLocalTSAsync gets a local timestamp from PD or TSO microservice, without block the caller. - GetLocalTSAsync(ctx context.Context, dcLocation string) TSFuture // GetMinTS gets a timestamp from PD or the minimal timestamp across all keyspace groups from // the TSO microservice. GetMinTS(ctx context.Context) (int64, int64, error) + + // GetLocalTS gets a local timestamp from PD or TSO microservice. + // Deprecated: Local TSO will be completely removed in the future. + GetLocalTS(ctx context.Context, _ string) (int64, int64, error) + // GetLocalTSAsync gets a local timestamp from PD or TSO microservice, without block the caller. + // Deprecated: Local TSO will be completely removed in the future. + GetLocalTSAsync(ctx context.Context, _ string) TSFuture } type tsoClient struct { @@ -66,19 +68,13 @@ type tsoClient struct { svcDiscovery ServiceDiscovery tsoStreamBuilderFactory - // tsoAllocators defines the mapping {dc-location -> TSO allocator leader URL} - tsoAllocators sync.Map // Store as map[string]string - // tsoAllocServingURLSwitchedCallback will be called when any global/local - // tso allocator leader is switched. - tsoAllocServingURLSwitchedCallback []func() + // leaderURL is the URL of the TSO leader. + leaderURL atomic.Value // tsoReqPool is the pool to recycle `*tsoRequest`. tsoReqPool *sync.Pool - // tsoDispatcher is used to dispatch different TSO requests to - // the corresponding dc-location TSO channel. - tsoDispatcher sync.Map // Same as map[string]*tsoDispatcher - - checkTSODispatcherCh chan struct{} + // dispatcher is used to dispatch the TSO requests to the channel. + dispatcher atomic.Pointer[tsoDispatcher] } // newTSOClient returns a new TSO client. @@ -102,13 +98,11 @@ func newTSOClient( } }, }, - checkTSODispatcherCh: make(chan struct{}, 1), } - eventSrc := svcDiscovery.(tsoAllocatorEventSource) - eventSrc.SetTSOLocalServURLsUpdatedCallback(c.updateTSOLocalServURLs) - eventSrc.SetTSOGlobalServURLUpdatedCallback(c.updateTSOGlobalServURL) - c.svcDiscovery.AddServiceURLsSwitchedCallback(c.scheduleUpdateAllTSOConnectionCtxs) + eventSrc := svcDiscovery.(tsoEventSource) + eventSrc.SetTSOLeaderURLUpdatedCallback(c.updateTSOLeaderURL) + c.svcDiscovery.AddServiceURLsSwitchedCallback(c.scheduleUpdateTSOConnectionCtxs) return c } @@ -117,36 +111,15 @@ func (c *tsoClient) getOption() *option { return c.option } func (c *tsoClient) getServiceDiscovery() ServiceDiscovery { return c.svcDiscovery } +func (c *tsoClient) getDispatcher() *tsoDispatcher { + return c.dispatcher.Load() +} + func (c *tsoClient) setup() { if err := c.svcDiscovery.CheckMemberChanged(); err != nil { log.Warn("[tso] failed to check member changed", errs.ZapError(err)) } - c.updateTSODispatcher() - - // Start the daemons. - c.wg.Add(1) - go c.tsoDispatcherCheckLoop() -} - -func (c *tsoClient) tsoDispatcherCheckLoop() { - log.Info("[tso] start tso dispatcher check loop") - defer log.Info("[tso] exit tso dispatcher check loop") - defer c.wg.Done() - - loopCtx, loopCancel := context.WithCancel(c.ctx) - defer loopCancel() - - ticker := time.NewTicker(tsoDispatcherCheckInterval) - defer ticker.Stop() - for { - c.updateTSODispatcher() - select { - case <-ticker.C: - case <-c.checkTSODispatcherCh: - case <-loopCtx.Done(): - return - } - } + c.tryCreateTSODispatcher() } // close closes the TSO client @@ -160,40 +133,16 @@ func (c *tsoClient) close() { c.wg.Wait() log.Info("[tso] close tso client") - c.closeTSODispatcher() + c.getDispatcher().close() log.Info("[tso] tso client is closed") } -func (c *tsoClient) scheduleCheckTSODispatcher() { - select { - case c.checkTSODispatcherCh <- struct{}{}: - default: - } -} - -// scheduleUpdateAllTSOConnectionCtxs update the TSO connection contexts for all dc-locations. -func (c *tsoClient) scheduleUpdateAllTSOConnectionCtxs() { - c.tsoDispatcher.Range(func(_, dispatcher any) bool { - dispatcher.(*tsoDispatcher).scheduleUpdateConnectionCtxs() - return true - }) -} - -// scheduleUpdateTSOConnectionCtxs update the TSO connection contexts for the given dc-location. -func (c *tsoClient) scheduleUpdateTSOConnectionCtxs(dcLocation string) { - dispatcher, ok := c.getTSODispatcher(dcLocation) - if !ok { - return - } - dispatcher.scheduleUpdateConnectionCtxs() -} - -// TSO Follower Proxy only supports the Global TSO proxy now. -func (c *tsoClient) allowTSOFollowerProxy(dc string) bool { - return dc == globalDCLocation && c.option.getEnableTSOFollowerProxy() +// scheduleUpdateTSOConnectionCtxs update the TSO connection contexts. +func (c *tsoClient) scheduleUpdateTSOConnectionCtxs() { + c.getDispatcher().scheduleUpdateConnectionCtxs() } -func (c *tsoClient) getTSORequest(ctx context.Context, dcLocation string) *tsoRequest { +func (c *tsoClient) getTSORequest(ctx context.Context) *tsoRequest { req := c.tsoReqPool.Get().(*tsoRequest) // Set needed fields in the request before using it. req.start = time.Now() @@ -202,121 +151,41 @@ func (c *tsoClient) getTSORequest(ctx context.Context, dcLocation string) *tsoRe req.clientCtx = c.ctx req.physical = 0 req.logical = 0 - req.dcLocation = dcLocation req.streamID = "" return req } -func (c *tsoClient) getTSODispatcher(dcLocation string) (*tsoDispatcher, bool) { - dispatcher, ok := c.tsoDispatcher.Load(dcLocation) - if !ok || dispatcher == nil { - return nil, false +func (c *tsoClient) getTSOLeaderURL() string { + url := c.leaderURL.Load() + if url == nil { + return "" } - return dispatcher.(*tsoDispatcher), true -} - -// GetTSOAllocators returns {dc-location -> TSO allocator leader URL} connection map -func (c *tsoClient) GetTSOAllocators() *sync.Map { - return &c.tsoAllocators + return url.(string) } -// GetTSOAllocatorServingURLByDCLocation returns the tso allocator of the given dcLocation -func (c *tsoClient) GetTSOAllocatorServingURLByDCLocation(dcLocation string) (string, bool) { - url, exist := c.tsoAllocators.Load(dcLocation) - if !exist { - return "", false - } - return url.(string), true -} - -// GetTSOAllocatorClientConnByDCLocation returns the TSO allocator gRPC client connection of the given dcLocation. -func (c *tsoClient) GetTSOAllocatorClientConnByDCLocation(dcLocation string) (*grpc.ClientConn, string) { - url, ok := c.tsoAllocators.Load(dcLocation) - if !ok { - log.Fatal("[tso] the allocator leader should exist", zap.String("dc-location", dcLocation)) +// getTSOLeaderClientConn returns the TSO leader gRPC client connection. +func (c *tsoClient) getTSOLeaderClientConn() (*grpc.ClientConn, string) { + url := c.getTSOLeaderURL() + if len(url) == 0 { + log.Fatal("[tso] the tso leader should exist") } cc, ok := c.svcDiscovery.GetClientConns().Load(url) if !ok { - return nil, url.(string) + return nil, url } - return cc.(*grpc.ClientConn), url.(string) + return cc.(*grpc.ClientConn), url } -// AddTSOAllocatorServingURLSwitchedCallback adds callbacks which will be called -// when any global/local tso allocator service endpoint is switched. -func (c *tsoClient) AddTSOAllocatorServingURLSwitchedCallback(callbacks ...func()) { - c.tsoAllocServingURLSwitchedCallback = append(c.tsoAllocServingURLSwitchedCallback, callbacks...) -} - -func (c *tsoClient) updateTSOLocalServURLs(allocatorMap map[string]string) error { - if len(allocatorMap) == 0 { - return nil - } - - updated := false - - // Switch to the new one - for dcLocation, url := range allocatorMap { - if len(url) == 0 { - continue - } - oldURL, exist := c.GetTSOAllocatorServingURLByDCLocation(dcLocation) - if exist && url == oldURL { - continue - } - updated = true - if _, err := c.svcDiscovery.GetOrCreateGRPCConn(url); err != nil { - log.Warn("[tso] failed to connect dc tso allocator serving url", - zap.String("dc-location", dcLocation), - zap.String("serving-url", url), - errs.ZapError(err)) - return err - } - c.tsoAllocators.Store(dcLocation, url) - log.Info("[tso] switch dc tso local allocator serving url", - zap.String("dc-location", dcLocation), - zap.String("new-url", url), - zap.String("old-url", oldURL)) - // Should trigger the update of the connection contexts once the allocator leader is switched. - c.scheduleUpdateTSOConnectionCtxs(dcLocation) - } - - // Garbage collection of the old TSO allocator primaries - c.gcAllocatorServingURL(allocatorMap) - - if updated { - c.scheduleCheckTSODispatcher() - } - - return nil -} - -func (c *tsoClient) updateTSOGlobalServURL(url string) error { - c.tsoAllocators.Store(globalDCLocation, url) - log.Info("[tso] switch dc tso global allocator serving url", - zap.String("dc-location", globalDCLocation), - zap.String("new-url", url)) - c.scheduleUpdateTSOConnectionCtxs(globalDCLocation) - c.scheduleCheckTSODispatcher() +func (c *tsoClient) updateTSOLeaderURL(url string) error { + c.leaderURL.Store(url) + log.Info("[tso] switch the tso leader serving url", zap.String("new-url", url)) + // Try to create the TSO dispatcher if it is not created yet. + c.tryCreateTSODispatcher() + // Update the TSO connection contexts after the dispatcher is ready. + c.scheduleUpdateTSOConnectionCtxs() return nil } -func (c *tsoClient) gcAllocatorServingURL(curAllocatorMap map[string]string) { - // Clean up the old TSO allocators - c.tsoAllocators.Range(func(dcLocationKey, _ any) bool { - dcLocation := dcLocationKey.(string) - // Skip the Global TSO Allocator - if dcLocation == globalDCLocation { - return true - } - if _, exist := curAllocatorMap[dcLocation]; !exist { - log.Info("[tso] delete unused tso allocator", zap.String("dc-location", dcLocation)) - c.tsoAllocators.Delete(dcLocation) - } - return true - }) -} - // backupClientConn gets a grpc client connection of the current reachable and healthy // backup service endpoints randomly. Backup service endpoints are followers in a // quorum-based cluster or secondaries in a primary/secondary configured cluster. @@ -354,28 +223,27 @@ type tsoConnectionContext struct { stream *tsoStream } -// updateConnectionCtxs will choose the proper way to update the connections for the given dc-location. +// updateConnectionCtxs will choose the proper way to update the connections. // It will return a bool to indicate whether the update is successful. -func (c *tsoClient) updateConnectionCtxs(ctx context.Context, dc string, connectionCtxs *sync.Map) bool { +func (c *tsoClient) updateConnectionCtxs(ctx context.Context, connectionCtxs *sync.Map) bool { // Normal connection creating, it will be affected by the `enableForwarding`. createTSOConnection := c.tryConnectToTSO - if c.allowTSOFollowerProxy(dc) { + if c.option.enableForwarding { createTSOConnection = c.tryConnectToTSOWithProxy } - if err := createTSOConnection(ctx, dc, connectionCtxs); err != nil { - log.Error("[tso] update connection contexts failed", zap.String("dc", dc), errs.ZapError(err)) + if err := createTSOConnection(ctx, connectionCtxs); err != nil { + log.Error("[tso] update connection contexts failed", errs.ZapError(err)) return false } return true } -// tryConnectToTSO will try to connect to the TSO allocator leader. If the connection becomes unreachable +// tryConnectToTSO will try to connect to the TSO leader. If the connection becomes unreachable // and enableForwarding is true, it will create a new connection to a follower to do the forwarding, // while a new daemon will be created also to switch back to a normal leader connection ASAP the // connection comes back to normal. func (c *tsoClient) tryConnectToTSO( ctx context.Context, - dc string, connectionCtxs *sync.Map, ) error { var ( @@ -405,7 +273,7 @@ func (c *tsoClient) tryConnectToTSO( // Retry several times before falling back to the follower when the network problem happens for range maxRetryTimes { c.svcDiscovery.ScheduleCheckMemberChanged() - cc, url = c.GetTSOAllocatorClientConnByDCLocation(dc) + cc, url = c.getTSOLeaderClientConn() if _, ok := connectionCtxs.Load(url); ok { // Just trigger the clean up of the stale connection contexts. updateAndClear(url, nil) @@ -448,10 +316,10 @@ func (c *tsoClient) tryConnectToTSO( // encounter the network error backupClientConn, backupURL := c.backupClientConn() if backupClientConn != nil { - log.Info("[tso] fall back to use follower to forward tso stream", zap.String("dc", dc), zap.String("follower-url", backupURL)) - forwardedHost, ok := c.GetTSOAllocatorServingURLByDCLocation(dc) - if !ok { - return errors.Errorf("cannot find the allocator leader in %s", dc) + log.Info("[tso] fall back to use follower to forward tso stream", zap.String("follower-url", backupURL)) + forwardedHost := c.getTSOLeaderURL() + if len(forwardedHost) == 0 { + return errors.Errorf("cannot find the tso leader") } // create the follower stream @@ -462,7 +330,7 @@ func (c *tsoClient) tryConnectToTSO( forwardedHostTrim := trimHTTPPrefix(forwardedHost) addr := trimHTTPPrefix(backupURL) // the goroutine is used to check the network and change back to the original stream - go c.checkAllocator(ctx, cancel, dc, forwardedHostTrim, addr, url, updateAndClear) + go c.checkLeader(ctx, cancel, forwardedHostTrim, addr, url, updateAndClear) requestForwarded.WithLabelValues(forwardedHostTrim, addr).Set(1) updateAndClear(backupURL, &tsoConnectionContext{cctx, cancel, backupURL, stream}) return nil @@ -473,10 +341,10 @@ func (c *tsoClient) tryConnectToTSO( return err } -func (c *tsoClient) checkAllocator( +func (c *tsoClient) checkLeader( ctx context.Context, forwardCancel context.CancelFunc, - dc, forwardedHostTrim, addr, url string, + forwardedHostTrim, addr, url string, updateAndClear func(newAddr string, connectionCtx *tsoConnectionContext), ) { defer func() { @@ -484,14 +352,14 @@ func (c *tsoClient) checkAllocator( forwardCancel() requestForwarded.WithLabelValues(forwardedHostTrim, addr).Set(0) }() - cc, u := c.GetTSOAllocatorClientConnByDCLocation(dc) + cc, u := c.getTSOLeaderClientConn() var healthCli healthpb.HealthClient ticker := time.NewTicker(time.Second) defer ticker.Stop() for { - // the pd/allocator leader change, we need to re-establish the stream + // the tso leader change, we need to re-establish the stream if u != url { - log.Info("[tso] the leader of the allocator leader is changed", zap.String("dc", dc), zap.String("origin", url), zap.String("new", u)) + log.Info("[tso] the tso leader is changed", zap.String("origin", url), zap.String("new", u)) return } if healthCli == nil && cc != nil { @@ -505,11 +373,11 @@ func (c *tsoClient) checkAllocator( }) healthCancel() if err == nil && resp.GetStatus() == healthpb.HealthCheckResponse_SERVING { - // create a stream of the original allocator + // create a stream of the original tso leader cctx, cancel := context.WithCancel(ctx) stream, err := c.tsoStreamBuilderFactory.makeBuilder(cc).build(cctx, cancel, c.option.timeout) if err == nil && stream != nil { - log.Info("[tso] recover the original tso stream since the network has become normal", zap.String("dc", dc), zap.String("url", url)) + log.Info("[tso] recover the original tso stream since the network has become normal", zap.String("url", url)) updateAndClear(url, &tsoConnectionContext{cctx, cancel, url, stream}) return } @@ -519,9 +387,8 @@ func (c *tsoClient) checkAllocator( case <-ctx.Done(): return case <-ticker.C: - // To ensure we can get the latest allocator leader - // and once the leader is changed, we can exit this function. - cc, u = c.GetTSOAllocatorClientConnByDCLocation(dc) + // To ensure we can get the latest tso leader and once it's changed, we can exit this function. + cc, u = c.getTSOLeaderClientConn() } } } @@ -530,21 +397,19 @@ func (c *tsoClient) checkAllocator( // a TSO proxy to reduce the pressure of the main serving service endpoint. func (c *tsoClient) tryConnectToTSOWithProxy( ctx context.Context, - dc string, connectionCtxs *sync.Map, ) error { tsoStreamBuilders := c.getAllTSOStreamBuilders() leaderAddr := c.svcDiscovery.GetServingURL() - forwardedHost, ok := c.GetTSOAllocatorServingURLByDCLocation(dc) - if !ok { - return errors.Errorf("cannot find the allocator leader in %s", dc) + forwardedHost := c.getTSOLeaderURL() + if len(forwardedHost) == 0 { + return errors.Errorf("cannot find the tso leader") } // GC the stale one. connectionCtxs.Range(func(addr, cc any) bool { addrStr := addr.(string) if _, ok := tsoStreamBuilders[addrStr]; !ok { log.Info("[tso] remove the stale tso stream", - zap.String("dc", dc), zap.String("addr", addrStr)) cc.(*tsoConnectionContext).cancel() connectionCtxs.Delete(addr) @@ -553,16 +418,15 @@ func (c *tsoClient) tryConnectToTSOWithProxy( }) // Update the missing one. for addr, tsoStreamBuilder := range tsoStreamBuilders { - if _, ok = connectionCtxs.Load(addr); ok { + if _, ok := connectionCtxs.Load(addr); ok { continue } - log.Info("[tso] try to create tso stream", - zap.String("dc", dc), zap.String("addr", addr)) + log.Info("[tso] try to create tso stream", zap.String("addr", addr)) cctx, cancel := context.WithCancel(ctx) // Do not proxy the leader client. if addr != leaderAddr { log.Info("[tso] use follower to forward tso stream to do the proxy", - zap.String("dc", dc), zap.String("addr", addr)) + zap.String("addr", addr)) cctx = grpcutil.BuildForwardContext(cctx, forwardedHost) } // Create the TSO stream. @@ -577,7 +441,7 @@ func (c *tsoClient) tryConnectToTSOWithProxy( continue } log.Error("[tso] create the tso stream failed", - zap.String("dc", dc), zap.String("addr", addr), errs.ZapError(err)) + zap.String("addr", addr), errs.ZapError(err)) cancel() } return nil @@ -609,57 +473,30 @@ func (c *tsoClient) getAllTSOStreamBuilders() map[string]tsoStreamBuilder { return streamBuilders } -func (c *tsoClient) createTSODispatcher(dcLocation string) { - dispatcher := newTSODispatcher(c.ctx, dcLocation, defaultMaxTSOBatchSize, c) - if _, ok := c.tsoDispatcher.LoadOrStore(dcLocation, dispatcher); !ok { - // Create a new dispatcher for the dc-location to handle the TSO requests. - c.wg.Add(1) - go dispatcher.handleDispatcher(&c.wg) - } else { +func (c *tsoClient) tryCreateTSODispatcher() { + // The dispatcher is already created. + if c.getDispatcher() != nil { + return + } + // The TSO leader is not ready. + url := c.getTSOLeaderURL() + if len(url) == 0 { + return + } + dispatcher := newTSODispatcher(c.ctx, defaultMaxTSOBatchSize, c) + c.wg.Add(1) + go dispatcher.handleDispatcher(&c.wg) + // Try to set the dispatcher atomically. + if swapped := c.dispatcher.CompareAndSwap(nil, dispatcher); !swapped { dispatcher.close() } } -func (c *tsoClient) closeTSODispatcher() { - c.tsoDispatcher.Range(func(_, dispatcherInterface any) bool { - if dispatcherInterface != nil { - dispatcherInterface.(*tsoDispatcher).close() - } - return true - }) -} - -func (c *tsoClient) updateTSODispatcher() { - // Set up the new TSO dispatcher and batch controller. - c.GetTSOAllocators().Range(func(dcLocationKey, _ any) bool { - dcLocation := dcLocationKey.(string) - if _, ok := c.getTSODispatcher(dcLocation); !ok { - c.createTSODispatcher(dcLocation) - } - return true - }) - // Clean up the unused TSO dispatcher - c.tsoDispatcher.Range(func(dcLocationKey, dispatcher any) bool { - dcLocation := dcLocationKey.(string) - // Skip the Global TSO Allocator - if dcLocation == globalDCLocation { - return true - } - if _, exist := c.GetTSOAllocators().Load(dcLocation); !exist { - log.Info("[tso] delete unused tso dispatcher", zap.String("dc-location", dcLocation)) - c.tsoDispatcher.Delete(dcLocation) - dispatcher.(*tsoDispatcher).close() - } - return true - }) -} - // dispatchRequest will send the TSO request to the corresponding TSO dispatcher. func (c *tsoClient) dispatchRequest(request *tsoRequest) (bool, error) { - dispatcher, ok := c.getTSODispatcher(request.dcLocation) - if !ok { - err := errs.ErrClientGetTSO.FastGenByArgs(fmt.Sprintf("unknown dc-location %s to the client", request.dcLocation)) - log.Error("[tso] dispatch tso request error", zap.String("dc-location", request.dcLocation), errs.ZapError(err)) + if c.getDispatcher() == nil { + err := errs.ErrClientGetTSO.FastGenByArgs("tso dispatcher is not ready") + log.Error("[tso] dispatch tso request error", errs.ZapError(err)) c.svcDiscovery.ScheduleCheckMemberChanged() // New dispatcher could be created in the meantime, which is retryable. return true, err @@ -681,7 +518,7 @@ func (c *tsoClient) dispatchRequest(request *tsoRequest) (bool, error) { failpoint.Inject("delayDispatchTSORequest", func() { time.Sleep(time.Second) }) - dispatcher.push(request) + c.getDispatcher().push(request) } // Check the contexts again to make sure the request is not been sent to a closed dispatcher. // Never retry on these conditions to prevent unexpected data race. diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index a59dcaa7c61..147cadab633 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -69,7 +69,7 @@ type tsoInfo struct { type tsoServiceProvider interface { getOption() *option getServiceDiscovery() ServiceDiscovery - updateConnectionCtxs(ctx context.Context, dc string, connectionCtxs *sync.Map) bool + updateConnectionCtxs(ctx context.Context, connectionCtxs *sync.Map) bool } const dispatcherCheckRPCConcurrencyInterval = time.Second * 5 @@ -77,7 +77,6 @@ const dispatcherCheckRPCConcurrencyInterval = time.Second * 5 type tsoDispatcher struct { ctx context.Context cancel context.CancelFunc - dc string provider tsoServiceProvider // URL -> *connectionContext @@ -102,7 +101,6 @@ type tsoDispatcher struct { func newTSODispatcher( ctx context.Context, - dc string, maxBatchSize int, provider tsoServiceProvider, ) *tsoDispatcher { @@ -119,7 +117,6 @@ func newTSODispatcher( td := &tsoDispatcher{ ctx: dispatcherCtx, cancel: dispatcherCancel, - dc: dc, provider: provider, connectionCtxs: &sync.Map{}, tsoRequestCh: tsoRequestCh, @@ -141,15 +138,15 @@ func newTSODispatcher( } func (td *tsoDispatcher) watchTSDeadline() { - log.Info("[tso] start tso deadline watcher", zap.String("dc-location", td.dc)) - defer log.Info("[tso] exit tso deadline watcher", zap.String("dc-location", td.dc)) + log.Info("[tso] start tso deadline watcher") + defer log.Info("[tso] exit tso deadline watcher") for { select { case d := <-td.tsDeadlineCh: select { case <-d.timer.C: log.Error("[tso] tso request is canceled due to timeout", - zap.String("dc-location", td.dc), errs.ZapError(errs.ErrClientGetTSOTimeout)) + errs.ZapError(errs.ErrClientGetTSOTimeout)) d.cancel() timerutil.GlobalTimerPool.Put(d.timer) case <-d.done: @@ -191,7 +188,6 @@ func (td *tsoDispatcher) push(request *tsoRequest) { func (td *tsoDispatcher) handleDispatcher(wg *sync.WaitGroup) { var ( ctx = td.ctx - dc = td.dc provider = td.provider svcDiscovery = provider.getServiceDiscovery() option = provider.getOption() @@ -199,10 +195,10 @@ func (td *tsoDispatcher) handleDispatcher(wg *sync.WaitGroup) { tsoBatchController *batchController[*tsoRequest] ) - log.Info("[tso] tso dispatcher created", zap.String("dc-location", dc)) + log.Info("[tso] tso dispatcher created") // Clean up the connectionCtxs when the dispatcher exits. defer func() { - log.Info("[tso] exit tso dispatcher", zap.String("dc-location", dc)) + log.Info("[tso] exit tso dispatcher") // Cancel all connections. connectionCtxs.Range(func(_, cc any) bool { cc.(*tsoConnectionContext).cancel() @@ -256,7 +252,7 @@ tsoBatchLoop: if err = td.checkTSORPCConcurrency(ctx, maxBatchWaitInterval, currentBatchStartTime); err != nil { // checkTSORPCConcurrency can only fail due to `ctx` being invalidated. log.Info("[tso] stop checking tso rpc concurrency configurations due to context canceled", - zap.String("dc-location", dc), zap.Error(err)) + zap.Error(err)) return } @@ -265,11 +261,9 @@ tsoBatchLoop: // otherwise the upper caller may get blocked on waiting for the results. if err = tsoBatchController.fetchPendingRequests(ctx, td.tsoRequestCh, td.tokenCh, maxBatchWaitInterval); err != nil { if err == context.Canceled { - log.Info("[tso] stop fetching the pending tso requests due to context canceled", - zap.String("dc-location", dc)) + log.Info("[tso] stop fetching the pending tso requests due to context canceled") } else { log.Error("[tso] fetch pending tso requests error", - zap.String("dc-location", dc), zap.Error(errs.ErrClientGetTSO.FastGenByArgs(err.Error()))) } return @@ -296,8 +290,8 @@ tsoBatchLoop: } // Check stream and retry if necessary. if stream == nil { - log.Info("[tso] tso stream is not ready", zap.String("dc", dc)) - if provider.updateConnectionCtxs(ctx, dc, connectionCtxs) { + log.Info("[tso] tso stream is not ready") + if provider.updateConnectionCtxs(ctx, connectionCtxs) { continue streamChoosingLoop } timer := time.NewTimer(retryInterval) @@ -309,7 +303,7 @@ tsoBatchLoop: return case <-streamLoopTimer.C: err = errs.ErrClientCreateTSOStream.FastGenByArgs(errs.RetryTimeoutErr) - log.Error("[tso] create tso stream error", zap.String("dc-location", dc), errs.ZapError(err)) + log.Error("[tso] create tso stream error", errs.ZapError(err)) svcDiscovery.ScheduleCheckMemberChanged() // Finish the collected requests if the stream is failed to be created. td.cancelCollectedRequests(tsoBatchController, invalidStreamID, errors.WithStack(err)) @@ -322,7 +316,7 @@ tsoBatchLoop: } select { case <-streamCtx.Done(): - log.Info("[tso] tso stream is canceled", zap.String("dc", dc), zap.String("stream-url", streamURL)) + log.Info("[tso] tso stream is canceled", zap.String("stream-url", streamURL)) // Set `stream` to nil and remove this stream from the `connectionCtxs` due to being canceled. connectionCtxs.Delete(streamURL) cancel() @@ -388,7 +382,7 @@ tsoBatchLoop: if err != nil { // There should not be other kinds of errors. log.Info("[tso] stop fetching the pending tso requests due to context canceled", - zap.String("dc-location", dc), zap.Error(err)) + zap.Error(err)) td.cancelCollectedRequests(tsoBatchController, invalidStreamID, errors.WithStack(ctx.Err())) return } @@ -405,7 +399,7 @@ tsoBatchLoop: case td.tsDeadlineCh <- dl: } // processRequests guarantees that the collected requests could be finished properly. - err = td.processRequests(stream, dc, tsoBatchController, done) + err = td.processRequests(stream, tsoBatchController, done) // If error happens during tso stream handling, reset stream and run the next trial. if err == nil { // A nil error returned by `processRequests` indicates that the request batch is started successfully. @@ -440,7 +434,6 @@ func (td *tsoDispatcher) handleProcessRequestError(ctx context.Context, bo *retr svcDiscovery.ScheduleCheckMemberChanged() log.Error("[tso] getTS error after processing requests", - zap.String("dc-location", td.dc), zap.String("stream-url", streamURL), zap.Error(errs.ErrClientGetTSO.FastGenByArgs(err.Error()))) // Set `stream` to nil and remove this stream from the `connectionCtxs` due to error. @@ -460,7 +453,7 @@ func (td *tsoDispatcher) handleProcessRequestError(ctx context.Context, bo *retr // will cancel the current stream, then the EOF error caused by cancel() // should not trigger the updateConnectionCtxs here. // So we should only call it when the leader changes. - td.provider.updateConnectionCtxs(ctx, td.dc, td.connectionCtxs) + td.provider.updateConnectionCtxs(ctx, td.connectionCtxs) } return true @@ -470,14 +463,13 @@ func (td *tsoDispatcher) handleProcessRequestError(ctx context.Context, bo *retr func (td *tsoDispatcher) connectionCtxsUpdater() { var ( ctx = td.ctx - dc = td.dc connectionCtxs = td.connectionCtxs provider = td.provider option = td.provider.getOption() updateTicker = &time.Ticker{} ) - log.Info("[tso] start tso connection contexts updater", zap.String("dc-location", dc)) + log.Info("[tso] start tso connection contexts updater") setNewUpdateTicker := func(ticker *time.Ticker) { if updateTicker.C != nil { updateTicker.Stop() @@ -488,19 +480,14 @@ func (td *tsoDispatcher) connectionCtxsUpdater() { defer setNewUpdateTicker(nil) for { - provider.updateConnectionCtxs(ctx, dc, connectionCtxs) + provider.updateConnectionCtxs(ctx, connectionCtxs) select { case <-ctx.Done(): - log.Info("[tso] exit tso connection contexts updater", zap.String("dc-location", dc)) + log.Info("[tso] exit tso connection contexts updater") return case <-option.enableTSOFollowerProxyCh: - // TODO: implement support of TSO Follower Proxy for the Local TSO. - if dc != globalDCLocation { - continue - } enableTSOFollowerProxy := option.getEnableTSOFollowerProxy() log.Info("[tso] tso follower proxy status changed", - zap.String("dc-location", dc), zap.Bool("enable", enableTSOFollowerProxy)) if enableTSOFollowerProxy && updateTicker.C == nil { // Because the TSO Follower Proxy is enabled, @@ -541,7 +528,7 @@ func chooseStream(connectionCtxs *sync.Map) (connectionCtx *tsoConnectionContext // `close(done)` will be called at the same time when finishing the requests. // If this function returns a non-nil error, the requests will always be canceled synchronously. func (td *tsoDispatcher) processRequests( - stream *tsoStream, dcLocation string, tbc *batchController[*tsoRequest], done chan struct{}, + stream *tsoStream, tbc *batchController[*tsoRequest], done chan struct{}, ) error { // `done` must be guaranteed to be eventually called. var ( @@ -604,7 +591,7 @@ func (td *tsoDispatcher) processRequests( err := stream.processRequests( clusterID, keyspaceID, reqKeyspaceGroupID, - dcLocation, count, tbc.extraBatchingStartTime, cb) + count, tbc.extraBatchingStartTime, cb) if err != nil { close(done) @@ -650,7 +637,6 @@ func (td *tsoDispatcher) checkMonotonicity( if lastTSOInfo != nil { if lastTSOInfo.respKeyspaceGroupID != curTSOInfo.respKeyspaceGroupID { log.Info("[tso] keyspace group changed", - zap.String("dc-location", td.dc), zap.Uint32("old-group-id", lastTSOInfo.respKeyspaceGroupID), zap.Uint32("new-group-id", curTSOInfo.respKeyspaceGroupID)) } @@ -660,7 +646,6 @@ func (td *tsoDispatcher) checkMonotonicity( // last time. if tsoutil.TSLessEqual(curTSOInfo.physical, firstLogical, lastTSOInfo.physical, lastTSOInfo.logical) { log.Panic("[tso] timestamp fallback", - zap.String("dc-location", td.dc), zap.Uint32("keyspace", keyspaceID), zap.String("last-ts", fmt.Sprintf("(%d, %d)", lastTSOInfo.physical, lastTSOInfo.logical)), zap.String("cur-ts", fmt.Sprintf("(%d, %d)", curTSOInfo.physical, firstLogical)), diff --git a/client/tso_dispatcher_test.go b/client/tso_dispatcher_test.go index 194c9bde455..2eb30066532 100644 --- a/client/tso_dispatcher_test.go +++ b/client/tso_dispatcher_test.go @@ -50,7 +50,7 @@ func (*mockTSOServiceProvider) getServiceDiscovery() ServiceDiscovery { return NewMockPDServiceDiscovery([]string{mockStreamURL}, nil) } -func (m *mockTSOServiceProvider) updateConnectionCtxs(ctx context.Context, _dc string, connectionCtxs *sync.Map) bool { +func (m *mockTSOServiceProvider) updateConnectionCtxs(ctx context.Context, connectionCtxs *sync.Map) bool { // Avoid concurrent updating in the background updating goroutine and active updating in the dispatcher loop when // stream is missing. m.updateConnMu.Lock() @@ -102,14 +102,13 @@ func (s *testTSODispatcherSuite) SetupTest() { created.Store(true) return s.stream } - s.dispatcher = newTSODispatcher(context.Background(), globalDCLocation, defaultMaxTSOBatchSize, newMockTSOServiceProvider(s.option, createStream)) + s.dispatcher = newTSODispatcher(context.Background(), defaultMaxTSOBatchSize, newMockTSOServiceProvider(s.option, createStream)) s.reqPool = &sync.Pool{ New: func() any { return &tsoRequest{ - done: make(chan error, 1), - physical: 0, - logical: 0, - dcLocation: globalDCLocation, + done: make(chan error, 1), + physical: 0, + logical: 0, } }, } @@ -331,10 +330,9 @@ func BenchmarkTSODispatcherHandleRequests(b *testing.B) { reqPool := &sync.Pool{ New: func() any { return &tsoRequest{ - done: make(chan error, 1), - physical: 0, - logical: 0, - dcLocation: globalDCLocation, + done: make(chan error, 1), + physical: 0, + logical: 0, } }, } @@ -349,7 +347,7 @@ func BenchmarkTSODispatcherHandleRequests(b *testing.B) { return req } - dispatcher := newTSODispatcher(ctx, globalDCLocation, defaultMaxTSOBatchSize, newMockTSOServiceProvider(newOption(), nil)) + dispatcher := newTSODispatcher(ctx, defaultMaxTSOBatchSize, newMockTSOServiceProvider(newOption(), nil)) var wg sync.WaitGroup wg.Add(1) diff --git a/client/tso_request.go b/client/tso_request.go index 441e92a4390..29654752cd0 100644 --- a/client/tso_request.go +++ b/client/tso_request.go @@ -40,7 +40,6 @@ type tsoRequest struct { done chan error physical int64 logical int64 - dcLocation string // The identifier of the RPC stream in which the request is processed. streamID string diff --git a/client/tso_service_discovery.go b/client/tso_service_discovery.go index 634b4211e38..b8debf05efe 100644 --- a/client/tso_service_discovery.go +++ b/client/tso_service_discovery.go @@ -51,7 +51,7 @@ const ( ) var _ ServiceDiscovery = (*tsoServiceDiscovery)(nil) -var _ tsoAllocatorEventSource = (*tsoServiceDiscovery)(nil) +var _ tsoEventSource = (*tsoServiceDiscovery)(nil) // keyspaceGroupSvcDiscovery is used for discovering the serving endpoints of the keyspace // group to which the keyspace belongs @@ -136,11 +136,8 @@ type tsoServiceDiscovery struct { // URL -> a gRPC connection clientConns sync.Map // Store as map[string]*grpc.ClientConn - // localAllocPrimariesUpdatedCb will be called when the local tso allocator primary list is updated. - // The input is a map {DC Location -> Leader URL} - localAllocPrimariesUpdatedCb tsoLocalServURLsUpdatedFunc - // globalAllocPrimariesUpdatedCb will be called when the local tso allocator primary list is updated. - globalAllocPrimariesUpdatedCb tsoGlobalServURLUpdatedFunc + // tsoLeaderUpdatedCb will be called when the TSO leader is updated. + tsoLeaderUpdatedCb tsoLeaderURLUpdatedFunc checkMembershipCh chan struct{} @@ -360,22 +357,15 @@ func (*tsoServiceDiscovery) AddServingURLSwitchedCallback(...func()) {} // in a primary/secondary configured cluster is changed. func (*tsoServiceDiscovery) AddServiceURLsSwitchedCallback(...func()) {} -// SetTSOLocalServURLsUpdatedCallback adds a callback which will be called when the local tso -// allocator leader list is updated. -func (c *tsoServiceDiscovery) SetTSOLocalServURLsUpdatedCallback(callback tsoLocalServURLsUpdatedFunc) { - c.localAllocPrimariesUpdatedCb = callback -} - -// SetTSOGlobalServURLUpdatedCallback adds a callback which will be called when the global tso -// allocator leader is updated. -func (c *tsoServiceDiscovery) SetTSOGlobalServURLUpdatedCallback(callback tsoGlobalServURLUpdatedFunc) { +// SetTSOLeaderURLUpdatedCallback adds a callback which will be called when the TSO leader is updated. +func (c *tsoServiceDiscovery) SetTSOLeaderURLUpdatedCallback(callback tsoLeaderURLUpdatedFunc) { url := c.getPrimaryURL() if len(url) > 0 { if err := callback(url); err != nil { log.Error("[tso] failed to call back when tso global service url update", zap.String("url", url), errs.ZapError(err)) } } - c.globalAllocPrimariesUpdatedCb = callback + c.tsoLeaderUpdatedCb = callback } // GetServiceClient implements ServiceDiscovery @@ -404,8 +394,8 @@ func (c *tsoServiceDiscovery) getSecondaryURLs() []string { func (c *tsoServiceDiscovery) afterPrimarySwitched(oldPrimary, newPrimary string) error { // Run callbacks - if c.globalAllocPrimariesUpdatedCb != nil { - if err := c.globalAllocPrimariesUpdatedCb(newPrimary); err != nil { + if c.tsoLeaderUpdatedCb != nil { + if err := c.tsoLeaderUpdatedCb(newPrimary); err != nil { return err } } diff --git a/client/tso_stream.go b/client/tso_stream.go index 142ad71c6b9..887e0ee0292 100644 --- a/client/tso_stream.go +++ b/client/tso_stream.go @@ -115,8 +115,7 @@ type tsoRequestResult struct { } type grpcTSOStreamAdapter interface { - Send(clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string, - count int64) error + Send(clusterID uint64, keyspaceID, keyspaceGroupID uint32, count int64) error Recv() (tsoRequestResult, error) } @@ -125,13 +124,12 @@ type pdTSOStreamAdapter struct { } // Send implements the grpcTSOStreamAdapter interface. -func (s pdTSOStreamAdapter) Send(clusterID uint64, _, _ uint32, dcLocation string, count int64) error { +func (s pdTSOStreamAdapter) Send(clusterID uint64, _, _ uint32, count int64) error { req := &pdpb.TsoRequest{ Header: &pdpb.RequestHeader{ ClusterId: clusterID, }, - Count: uint32(count), - DcLocation: dcLocation, + Count: uint32(count), } return s.stream.Send(req) } @@ -156,15 +154,14 @@ type tsoTSOStreamAdapter struct { } // Send implements the grpcTSOStreamAdapter interface. -func (s tsoTSOStreamAdapter) Send(clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string, count int64) error { +func (s tsoTSOStreamAdapter) Send(clusterID uint64, keyspaceID, keyspaceGroupID uint32, count int64) error { req := &tsopb.TsoRequest{ Header: &tsopb.RequestHeader{ ClusterId: clusterID, KeyspaceId: keyspaceID, KeyspaceGroupId: keyspaceGroupID, }, - Count: uint32(count), - DcLocation: dcLocation, + Count: uint32(count), } return s.stream.Send(req) } @@ -268,7 +265,7 @@ func (s *tsoStream) getServerURL() string { // It's guaranteed that the `callback` will be called, but when the request is failed to be scheduled, the callback // will be ignored. func (s *tsoStream) processRequests( - clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string, count int64, batchStartTime time.Time, callback onFinishedCallback, + clusterID uint64, keyspaceID, keyspaceGroupID uint32, count int64, batchStartTime time.Time, callback onFinishedCallback, ) error { start := time.Now() @@ -305,7 +302,7 @@ func (s *tsoStream) processRequests( } s.state.Store(prevState) - if err := s.stream.Send(clusterID, keyspaceID, keyspaceGroupID, dcLocation, count); err != nil { + if err := s.stream.Send(clusterID, keyspaceID, keyspaceGroupID, count); err != nil { // As the request is already put into `pendingRequests`, the request should finally be canceled by the recvLoop. // So skip returning error here to avoid // if err == io.EOF { diff --git a/client/tso_stream_test.go b/client/tso_stream_test.go index 6595ed2c13a..12fec0c7ddd 100644 --- a/client/tso_stream_test.go +++ b/client/tso_stream_test.go @@ -76,7 +76,7 @@ func newMockTSOStreamImpl(ctx context.Context, resultMode resultMode) *mockTSOSt } } -func (s *mockTSOStreamImpl) Send(clusterID uint64, _keyspaceID, keyspaceGroupID uint32, _dcLocation string, count int64) error { +func (s *mockTSOStreamImpl) Send(clusterID uint64, _keyspaceID, keyspaceGroupID uint32, count int64) error { select { case <-s.ctx.Done(): return s.ctx.Err() @@ -305,7 +305,7 @@ func (s *testTSOStreamSuite) getResult(ch <-chan callbackInvocation) callbackInv func (s *testTSOStreamSuite) processRequestWithResultCh(count int64) (<-chan callbackInvocation, error) { ch := make(chan callbackInvocation, 1) - err := s.stream.processRequests(1, 2, 3, globalDCLocation, count, time.Now(), func(result tsoRequestResult, reqKeyspaceGroupID uint32, err error) { + err := s.stream.processRequests(1, 2, 3, count, time.Now(), func(result tsoRequestResult, reqKeyspaceGroupID uint32, err error) { if err == nil { s.re.Equal(uint32(3), reqKeyspaceGroupID) s.re.Equal(uint32(0), result.suffixBits) @@ -357,7 +357,7 @@ func (s *testTSOStreamSuite) TestTSOStreamBasic() { // After an error from the (simulated) RPC stream, the tsoStream should be in a broken status and can't accept // new request anymore. - err := s.stream.processRequests(1, 2, 3, globalDCLocation, 1, time.Now(), func(_result tsoRequestResult, _reqKeyspaceGroupID uint32, _err error) { + err := s.stream.processRequests(1, 2, 3, 1, time.Now(), func(_result tsoRequestResult, _reqKeyspaceGroupID uint32, _err error) { panic("unreachable") }) s.re.Error(err) @@ -621,7 +621,7 @@ func BenchmarkTSOStreamSendRecv(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - err := stream.processRequests(1, 1, 1, globalDCLocation, 1, now, func(result tsoRequestResult, _ uint32, err error) { + err := stream.processRequests(1, 1, 1, 1, now, func(result tsoRequestResult, _ uint32, err error) { if err != nil { panic(err) } diff --git a/tests/integrations/client/client_test.go b/tests/integrations/client/client_test.go index ea6cc872c08..b52fe389a9c 100644 --- a/tests/integrations/client/client_test.go +++ b/tests/integrations/client/client_test.go @@ -31,8 +31,6 @@ import ( "time" "github.com/docker/go-units" - "github.com/opentracing/basictracer-go" - "github.com/opentracing/opentracing-go" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/meta_storagepb" "github.com/pingcap/kvproto/pkg/metapb" @@ -40,14 +38,11 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" pd "github.com/tikv/pd/client" - clierrs "github.com/tikv/pd/client/errs" "github.com/tikv/pd/client/retry" "github.com/tikv/pd/pkg/core" - "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/mcs/utils/constant" "github.com/tikv/pd/pkg/mock/mockid" "github.com/tikv/pd/pkg/storage/endpoint" - "github.com/tikv/pd/pkg/tso" "github.com/tikv/pd/pkg/utils/assertutil" "github.com/tikv/pd/pkg/utils/keypath" "github.com/tikv/pd/pkg/utils/testutil" @@ -252,70 +247,6 @@ func TestGetTSAfterTransferLeader(t *testing.T) { re.NoError(err) } -func TestTSOAllocatorLeader(t *testing.T) { - re := require.New(t) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - dcLocationConfig := map[string]string{ - "pd1": "dc-1", - "pd2": "dc-2", - "pd3": "dc-3", - } - dcLocationNum := len(dcLocationConfig) - cluster, err := tests.NewTestCluster(ctx, dcLocationNum, func(conf *config.Config, serverName string) { - conf.EnableLocalTSO = true - conf.Labels[config.ZoneLabel] = dcLocationConfig[serverName] - }) - re.NoError(err) - defer cluster.Destroy() - - err = cluster.RunInitialServers() - re.NoError(err) - cluster.WaitAllLeaders(re, dcLocationConfig) - - var ( - testServers = cluster.GetServers() - endpoints = make([]string, 0, len(testServers)) - endpointsMap = make(map[string]string) - ) - for _, s := range testServers { - endpoints = append(endpoints, s.GetConfig().AdvertiseClientUrls) - endpointsMap[s.GetServer().GetMemberInfo().GetName()] = s.GetConfig().AdvertiseClientUrls - } - var allocatorLeaderMap = make(map[string]string) - for _, dcLocation := range dcLocationConfig { - var pdName string - testutil.Eventually(re, func() bool { - pdName = cluster.WaitAllocatorLeader(dcLocation) - return len(pdName) > 0 - }) - allocatorLeaderMap[dcLocation] = pdName - } - cli := setupCli(ctx, re, endpoints) - defer cli.Close() - innerCli, ok := cli.(interface{ GetServiceDiscovery() pd.ServiceDiscovery }) - re.True(ok) - - // Check allocator leaders URL map. - cli.Close() - for dcLocation, url := range getTSOAllocatorServingEndpointURLs(cli.(TSOAllocatorsGetter)) { - if dcLocation == tso.GlobalDCLocation { - urls := innerCli.GetServiceDiscovery().GetServiceURLs() - sort.Strings(urls) - sort.Strings(endpoints) - re.Equal(endpoints, urls) - continue - } - pdName, exist := allocatorLeaderMap[dcLocation] - re.True(exist) - re.NotEmpty(pdName) - pdURL, exist := endpointsMap[pdName] - re.True(exist) - re.NotEmpty(pdURL) - re.Equal(pdURL, url) - } -} - func TestTSOFollowerProxy(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) @@ -480,144 +411,6 @@ func TestUnavailableTimeAfterLeaderIsReady(t *testing.T) { re.Less(maxUnavailableTime.UnixMilli(), leaderReadyTime.Add(1*time.Second).UnixMilli()) } -// TODO: migrate the Local/Global TSO tests to TSO integration test folder. -func TestGlobalAndLocalTSO(t *testing.T) { - re := require.New(t) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - dcLocationConfig := map[string]string{ - "pd1": "dc-1", - "pd2": "dc-2", - "pd3": "dc-3", - } - dcLocationNum := len(dcLocationConfig) - cluster, err := tests.NewTestCluster(ctx, dcLocationNum, func(conf *config.Config, serverName string) { - conf.EnableLocalTSO = true - conf.Labels[config.ZoneLabel] = dcLocationConfig[serverName] - }) - re.NoError(err) - defer cluster.Destroy() - - endpoints := runServer(re, cluster) - cli := setupCli(ctx, re, endpoints) - defer cli.Close() - - // Wait for all nodes becoming healthy. - time.Sleep(time.Second * 5) - - // Join a new dc-location - pd4, err := cluster.Join(ctx, func(conf *config.Config, _ string) { - conf.EnableLocalTSO = true - conf.Labels[config.ZoneLabel] = "dc-4" - }) - re.NoError(err) - err = pd4.Run() - re.NoError(err) - dcLocationConfig["pd4"] = "dc-4" - cluster.CheckClusterDCLocation() - cluster.WaitAllLeaders(re, dcLocationConfig) - - // Test a nonexistent dc-location for Local TSO - p, l, err := cli.GetLocalTS(context.TODO(), "nonexistent-dc") - re.Equal(int64(0), p) - re.Equal(int64(0), l, int64(0)) - re.Error(err) - re.Contains(err.Error(), "unknown dc-location") - - wg := &sync.WaitGroup{} - requestGlobalAndLocalTSO(re, wg, dcLocationConfig, cli) - - // assert global tso after resign leader - re.NoError(failpoint.Enable("github.com/tikv/pd/client/skipUpdateMember", `return(true)`)) - err = cluster.ResignLeader() - re.NoError(err) - re.NotEmpty(cluster.WaitLeader()) - _, _, err = cli.GetTS(ctx) - re.Error(err) - re.True(clierrs.IsLeaderChange(err)) - _, _, err = cli.GetTS(ctx) - re.NoError(err) - re.NoError(failpoint.Disable("github.com/tikv/pd/client/skipUpdateMember")) - - recorder := basictracer.NewInMemoryRecorder() - tracer := basictracer.New(recorder) - span := tracer.StartSpan("trace") - ctx = opentracing.ContextWithSpan(ctx, span) - future := cli.GetLocalTSAsync(ctx, "error-dc") - spans := recorder.GetSpans() - re.Len(spans, 1) - _, _, err = future.Wait() - re.Error(err) - spans = recorder.GetSpans() - re.Len(spans, 1) - _, _, err = cli.GetTS(ctx) - re.NoError(err) - spans = recorder.GetSpans() - re.Len(spans, 3) - - // Test the TSO follower proxy while enabling the Local TSO. - cli.UpdateOption(pd.EnableTSOFollowerProxy, true) - // Sleep a while here to prevent from canceling the ongoing TSO request. - time.Sleep(time.Millisecond * 50) - requestGlobalAndLocalTSO(re, wg, dcLocationConfig, cli) - cli.UpdateOption(pd.EnableTSOFollowerProxy, false) - time.Sleep(time.Millisecond * 50) - requestGlobalAndLocalTSO(re, wg, dcLocationConfig, cli) -} - -func requestGlobalAndLocalTSO( - re *require.Assertions, - wg *sync.WaitGroup, - dcLocationConfig map[string]string, - cli pd.Client, -) { - for _, dcLocation := range dcLocationConfig { - wg.Add(tsoRequestConcurrencyNumber) - for range tsoRequestConcurrencyNumber { - go func(dc string) { - defer wg.Done() - var lastTS uint64 - for range tsoRequestRound { - globalPhysical1, globalLogical1, err := cli.GetTS(context.TODO()) - // The allocator leader may be changed due to the environment issue. - if err != nil { - re.ErrorContains(err, errs.NotLeaderErr) - } - globalTS1 := tsoutil.ComposeTS(globalPhysical1, globalLogical1) - localPhysical, localLogical, err := cli.GetLocalTS(context.TODO(), dc) - if err != nil { - re.ErrorContains(err, errs.NotLeaderErr) - } - localTS := tsoutil.ComposeTS(localPhysical, localLogical) - globalPhysical2, globalLogical2, err := cli.GetTS(context.TODO()) - if err != nil { - re.ErrorContains(err, errs.NotLeaderErr) - } - globalTS2 := tsoutil.ComposeTS(globalPhysical2, globalLogical2) - re.Less(lastTS, globalTS1) - re.Less(globalTS1, localTS) - re.Less(localTS, globalTS2) - lastTS = globalTS2 - } - re.Positive(lastTS) - }(dcLocation) - } - } - wg.Wait() -} - -// GetTSOAllocators defines the TSO allocators getter. -type TSOAllocatorsGetter interface{ GetTSOAllocators() *sync.Map } - -func getTSOAllocatorServingEndpointURLs(c TSOAllocatorsGetter) map[string]string { - allocatorLeaders := make(map[string]string) - c.GetTSOAllocators().Range(func(dcLocation, url any) bool { - allocatorLeaders[dcLocation.(string)] = url.(string) - return true - }) - return allocatorLeaders -} - func TestCustomTimeout(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) diff --git a/tests/integrations/go.mod b/tests/integrations/go.mod index 86b0a5399e3..4a90239e86d 100644 --- a/tests/integrations/go.mod +++ b/tests/integrations/go.mod @@ -124,8 +124,7 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/oleiade/reflections v1.0.1 // indirect - github.com/opentracing/basictracer-go v1.1.0 - github.com/opentracing/opentracing-go v1.2.0 + github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/pelletier/go-toml/v2 v2.2.2 // indirect github.com/petermattis/goid v0.0.0-20240813172612-4fcff4a6cae7 // indirect github.com/phf/go-queue v0.0.0-20170504031614-9abe38d0371d // indirect diff --git a/tests/integrations/go.sum b/tests/integrations/go.sum index 8012a5b6ea8..1903a580315 100644 --- a/tests/integrations/go.sum +++ b/tests/integrations/go.sum @@ -186,7 +186,6 @@ github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MG github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/protobuf v0.0.0-20171007142547-342cbe0a0415/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v0.0.0-20180717141946-636bf0302bc9/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= -github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang-jwt/jwt v3.2.1+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I= @@ -295,7 +294,6 @@ github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/u github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= -github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= @@ -357,8 +355,6 @@ github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042 github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.20.1 h1:PA/3qinGoukvymdIDV8pii6tiZgC8kbmJO6Z5+b002Q= github.com/onsi/gomega v1.20.1/go.mod h1:DtrZpjmvpn2mPm4YWQa0/ALMDj9v4YxLgojwPeREyVo= -github.com/opentracing/basictracer-go v1.1.0 h1:Oa1fTSBvAl8pa3U+IJYqrKm0NALwH9OsgwOqDv4xJW0= -github.com/opentracing/basictracer-go v1.1.0/go.mod h1:V2HZueSJEp879yv285Aap1BS69fQMD+MNP1mRs6mBQc= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs= github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= @@ -609,7 +605,6 @@ golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTk golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= -golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5 h1:2M3HP5CCK1Si9FQhwnzYhXdG6DXeebvUHFpre8QvbyI= golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= @@ -631,7 +626,6 @@ golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20200421231249-e086a090c8fd/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= @@ -704,7 +698,6 @@ golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= diff --git a/tools/pd-tso-bench/main.go b/tools/pd-tso-bench/main.go index dda4f364519..bcf0ff6eb40 100644 --- a/tools/pd-tso-bench/main.go +++ b/tools/pd-tso-bench/main.go @@ -49,7 +49,6 @@ var ( concurrency = flag.Int("c", 1000, "concurrency") count = flag.Int("count", 1, "the count number that the test will run") duration = flag.Duration("duration", 60*time.Second, "how many seconds the test will last") - dcLocation = flag.String("dc", "global", "which dc-location this bench will request") verbose = flag.Bool("v", false, "output statistics info every interval and output metrics info at the end") interval = flag.Duration("interval", time.Second, "interval to output the statistics") caPath = flag.String("cacert", "", "path of file that contains list of trusted SSL CAs") @@ -114,7 +113,7 @@ func bench(mainCtx context.Context) { ctx, cancel := context.WithCancel(mainCtx) // To avoid the first time high latency. for idx, pdCli := range pdClients { - _, _, err := pdCli.GetLocalTS(ctx, *dcLocation) + _, _, err := pdCli.GetTS(ctx) if err != nil { log.Fatal("get first time tso failed", zap.Int("client-number", idx), zap.Error(err)) } @@ -395,7 +394,7 @@ func reqWorker(ctx context.Context, pdClients []pd.Client, clientIdx int, durCh } } } - _, _, err = pdCli.GetLocalTS(reqCtx, *dcLocation) + _, _, err = pdCli.GetTS(reqCtx) if errors.Cause(err) == context.Canceled { if ticker != nil { ticker.Stop()