From 1608b2d5d3f45be8ecc22c77a525b093e1e167d6 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Tue, 12 Nov 2024 17:11:09 +0800 Subject: [PATCH] Remove the suffix bits Signed-off-by: JmPotato --- client/client.go | 8 +++----- client/pd_service_discovery.go | 3 ++- client/tso_client.go | 22 +++++++++++++--------- client/tso_dispatcher.go | 18 +++++++++--------- client/tso_stream.go | 3 --- client/tso_stream_test.go | 3 --- client/utils/tsoutil/tsoutil.go | 5 ----- tests/integrations/mcs/tso/proxy_test.go | 4 ++-- tests/integrations/tso/testutil.go | 2 +- tools/pd-tso-bench/README.md | 2 -- 10 files changed, 30 insertions(+), 40 deletions(-) diff --git a/client/client.go b/client/client.go index 0c2ccbd3805c..6633b295869d 100644 --- a/client/client.go +++ b/client/client.go @@ -34,7 +34,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/tikv/pd/client/errs" "github.com/tikv/pd/client/utils/tlsutil" - "github.com/tikv/pd/client/utils/tsoutil" "go.uber.org/zap" ) @@ -774,9 +773,8 @@ func (c *client) GetTS(ctx context.Context) (physical int64, logical int64, err } // GetLocalTS implements the TSOClient interface. -func (c *client) GetLocalTS(ctx context.Context, dcLocation string) (physical int64, logical int64, err error) { - resp := c.GetLocalTSAsync(ctx, dcLocation) - return resp.Wait() +func (c *client) GetLocalTS(ctx context.Context, _ string) (physical int64, logical int64, err error) { + return c.GetTS(ctx) } // GetMinTS implements the TSOClient interface. @@ -822,7 +820,7 @@ func (c *client) GetMinTS(ctx context.Context) (physical int64, logical int64, e } minTS := resp.GetTimestamp() - return minTS.Physical, tsoutil.AddLogical(minTS.Logical, 0, minTS.SuffixBits), nil + return minTS.Physical, minTS.Logical, nil } func handleRegionResponse(res *pdpb.GetRegionResponse) *Region { diff --git a/client/pd_service_discovery.go b/client/pd_service_discovery.go index c332ee7066b3..8e82056b4cc3 100644 --- a/client/pd_service_discovery.go +++ b/client/pd_service_discovery.go @@ -384,8 +384,9 @@ func (c *pdServiceBalancer) get() (ret ServiceClient) { type updateKeyspaceIDFunc func() error type tsoLeaderURLUpdatedFunc func(string) error +// tsoEventSource subscribes to events related to changes in the TSO leader/primary from the service discovery. type tsoEventSource interface { - // SetTSOLeaderURLUpdatedCallback adds a callback which will be called when the TSO leader is updated. + // SetTSOLeaderURLUpdatedCallback adds a callback which will be called when the TSO leader/primary is updated. SetTSOLeaderURLUpdatedCallback(callback tsoLeaderURLUpdatedFunc) } diff --git a/client/tso_client.go b/client/tso_client.go index 0cbc6280143f..d5045228caa6 100644 --- a/client/tso_client.go +++ b/client/tso_client.go @@ -53,10 +53,12 @@ type TSOClient interface { GetMinTS(ctx context.Context) (int64, int64, error) // GetLocalTS gets a local timestamp from PD or TSO microservice. - // Deprecated: Local TSO will be completely removed in the future. + // Deprecated: Local TSO will be completely removed in the future. Currently, regardless of the + // parameters passed in, this method will default to returning the global TSO. GetLocalTS(ctx context.Context, _ string) (int64, int64, error) // GetLocalTSAsync gets a local timestamp from PD or TSO microservice, without block the caller. - // Deprecated: Local TSO will be completely removed in the future. + // Deprecated: Local TSO will be completely removed in the future. Currently, regardless of the + // parameters passed in, this method will default to returning the global TSO. GetLocalTSAsync(ctx context.Context, _ string) TSFuture } @@ -155,7 +157,7 @@ func (c *tsoClient) getTSORequest(ctx context.Context) *tsoRequest { return req } -func (c *tsoClient) getTSOLeaderURL() string { +func (c *tsoClient) getLeaderURL() string { url := c.leaderURL.Load() if url == nil { return "" @@ -165,7 +167,7 @@ func (c *tsoClient) getTSOLeaderURL() string { // getTSOLeaderClientConn returns the TSO leader gRPC client connection. func (c *tsoClient) getTSOLeaderClientConn() (*grpc.ClientConn, string) { - url := c.getTSOLeaderURL() + url := c.getLeaderURL() if len(url) == 0 { log.Fatal("[tso] the tso leader should exist") } @@ -228,7 +230,7 @@ type tsoConnectionContext struct { func (c *tsoClient) updateConnectionCtxs(ctx context.Context, connectionCtxs *sync.Map) bool { // Normal connection creating, it will be affected by the `enableForwarding`. createTSOConnection := c.tryConnectToTSO - if c.option.enableForwarding { + if c.option.getEnableTSOFollowerProxy() { createTSOConnection = c.tryConnectToTSOWithProxy } if err := createTSOConnection(ctx, connectionCtxs); err != nil { @@ -317,7 +319,7 @@ func (c *tsoClient) tryConnectToTSO( backupClientConn, backupURL := c.backupClientConn() if backupClientConn != nil { log.Info("[tso] fall back to use follower to forward tso stream", zap.String("follower-url", backupURL)) - forwardedHost := c.getTSOLeaderURL() + forwardedHost := c.getLeaderURL() if len(forwardedHost) == 0 { return errors.Errorf("cannot find the tso leader") } @@ -401,7 +403,7 @@ func (c *tsoClient) tryConnectToTSOWithProxy( ) error { tsoStreamBuilders := c.getAllTSOStreamBuilders() leaderAddr := c.svcDiscovery.GetServingURL() - forwardedHost := c.getTSOLeaderURL() + forwardedHost := c.getLeaderURL() if len(forwardedHost) == 0 { return errors.Errorf("cannot find the tso leader") } @@ -418,7 +420,8 @@ func (c *tsoClient) tryConnectToTSOWithProxy( }) // Update the missing one. for addr, tsoStreamBuilder := range tsoStreamBuilders { - if _, ok := connectionCtxs.Load(addr); ok { + _, ok := connectionCtxs.Load(addr) + if ok { continue } log.Info("[tso] try to create tso stream", zap.String("addr", addr)) @@ -473,13 +476,14 @@ func (c *tsoClient) getAllTSOStreamBuilders() map[string]tsoStreamBuilder { return streamBuilders } +// tryCreateTSODispatcher will try to create the TSO dispatcher if it is not created yet. func (c *tsoClient) tryCreateTSODispatcher() { // The dispatcher is already created. if c.getDispatcher() != nil { return } // The TSO leader is not ready. - url := c.getTSOLeaderURL() + url := c.getLeaderURL() if len(url) == 0 { return } diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index 147cadab633c..3d77610179dd 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -125,7 +125,7 @@ func newTSODispatcher( New: func() any { return newBatchController[*tsoRequest]( maxBatchSize*2, - tsoRequestFinisher(0, 0, 0, invalidStreamID), + tsoRequestFinisher(0, 0, invalidStreamID), tsoBestBatchSize, ) }, @@ -459,7 +459,7 @@ func (td *tsoDispatcher) handleProcessRequestError(ctx context.Context, bo *retr return true } -// updateConnectionCtxs updates the `connectionCtxs` for the specified DC location regularly. +// updateConnectionCtxs updates the `connectionCtxs` regularly. func (td *tsoDispatcher) connectionCtxsUpdater() { var ( ctx = td.ctx @@ -583,10 +583,10 @@ func (td *tsoDispatcher) processRequests( sourceStreamID: stream.streamID, } // `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request. - firstLogical := tsoutil.AddLogical(result.logical, -int64(result.count)+1, result.suffixBits) + firstLogical := result.logical - int64(result.count) + 1 // Do the check before releasing the token. td.checkMonotonicity(tsoInfoBeforeReq, curTSOInfo, firstLogical) - td.doneCollectedRequests(tbc, result.physical, firstLogical, result.suffixBits, stream.streamID) + td.doneCollectedRequests(tbc, result.physical, firstLogical, stream.streamID) } err := stream.processRequests( @@ -601,11 +601,11 @@ func (td *tsoDispatcher) processRequests( return nil } -func tsoRequestFinisher(physical, firstLogical int64, suffixBits uint32, streamID string) finisherFunc[*tsoRequest] { +func tsoRequestFinisher(physical, firstLogical int64, streamID string) finisherFunc[*tsoRequest] { return func(idx int, tsoReq *tsoRequest, err error) { // Retrieve the request context before the request is done to trace without race. requestCtx := tsoReq.requestCtx - tsoReq.physical, tsoReq.logical = physical, tsoutil.AddLogical(firstLogical, int64(idx), suffixBits) + tsoReq.physical, tsoReq.logical = physical, firstLogical+int64(idx) tsoReq.streamID = streamID tsoReq.tryDone(err) trace.StartRegion(requestCtx, "pdclient.tsoReqDequeue").End() @@ -614,12 +614,12 @@ func tsoRequestFinisher(physical, firstLogical int64, suffixBits uint32, streamI func (td *tsoDispatcher) cancelCollectedRequests(tbc *batchController[*tsoRequest], streamID string, err error) { td.tokenCh <- struct{}{} - tbc.finishCollectedRequests(tsoRequestFinisher(0, 0, 0, streamID), err) + tbc.finishCollectedRequests(tsoRequestFinisher(0, 0, streamID), err) } -func (td *tsoDispatcher) doneCollectedRequests(tbc *batchController[*tsoRequest], physical, firstLogical int64, suffixBits uint32, streamID string) { +func (td *tsoDispatcher) doneCollectedRequests(tbc *batchController[*tsoRequest], physical, firstLogical int64, streamID string) { td.tokenCh <- struct{}{} - tbc.finishCollectedRequests(tsoRequestFinisher(physical, firstLogical, suffixBits, streamID), nil) + tbc.finishCollectedRequests(tsoRequestFinisher(physical, firstLogical, streamID), nil) } // checkMonotonicity checks whether the monotonicity of the TSO allocation is violated. diff --git a/client/tso_stream.go b/client/tso_stream.go index 887e0ee02924..55bfd0b72b0b 100644 --- a/client/tso_stream.go +++ b/client/tso_stream.go @@ -110,7 +110,6 @@ func checkStreamTimeout(ctx context.Context, cancel context.CancelFunc, done cha type tsoRequestResult struct { physical, logical int64 count uint32 - suffixBits uint32 respKeyspaceGroupID uint32 } @@ -144,7 +143,6 @@ func (s pdTSOStreamAdapter) Recv() (tsoRequestResult, error) { physical: resp.GetTimestamp().GetPhysical(), logical: resp.GetTimestamp().GetLogical(), count: resp.GetCount(), - suffixBits: resp.GetTimestamp().GetSuffixBits(), respKeyspaceGroupID: defaultKeySpaceGroupID, }, nil } @@ -176,7 +174,6 @@ func (s tsoTSOStreamAdapter) Recv() (tsoRequestResult, error) { physical: resp.GetTimestamp().GetPhysical(), logical: resp.GetTimestamp().GetLogical(), count: resp.GetCount(), - suffixBits: resp.GetTimestamp().GetSuffixBits(), respKeyspaceGroupID: resp.GetHeader().GetKeyspaceGroupId(), }, nil } diff --git a/client/tso_stream_test.go b/client/tso_stream_test.go index 12fec0c7ddd8..a842befb5506 100644 --- a/client/tso_stream_test.go +++ b/client/tso_stream_test.go @@ -210,7 +210,6 @@ func (s *mockTSOStreamImpl) autoGenResult(count int64) resultMsg { physical: s.resGenPhysical, logical: s.resGenLogical, count: uint32(count), - suffixBits: 0, respKeyspaceGroupID: 0, }, } @@ -225,7 +224,6 @@ func (s *mockTSOStreamImpl) returnResult(physical int64, logical int64, count ui physical: physical, logical: logical, count: count, - suffixBits: 0, respKeyspaceGroupID: s.keyspaceID, }, } @@ -308,7 +306,6 @@ func (s *testTSOStreamSuite) processRequestWithResultCh(count int64) (<-chan cal err := s.stream.processRequests(1, 2, 3, count, time.Now(), func(result tsoRequestResult, reqKeyspaceGroupID uint32, err error) { if err == nil { s.re.Equal(uint32(3), reqKeyspaceGroupID) - s.re.Equal(uint32(0), result.suffixBits) } ch <- callbackInvocation{ result: result, diff --git a/client/utils/tsoutil/tsoutil.go b/client/utils/tsoutil/tsoutil.go index ffc449640ac5..34256373b364 100644 --- a/client/utils/tsoutil/tsoutil.go +++ b/client/utils/tsoutil/tsoutil.go @@ -18,11 +18,6 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" ) -// AddLogical shifts the count before we add it to the logical part. -func AddLogical(logical, count int64, suffixBits uint32) int64 { - return logical + count<>timestamp.GetSuffixBits(), uint32(tsoCount)) + re.GreaterOrEqual(uint32(timestamp.GetLogical()), uint32(tsoCount)) return timestamp } diff --git a/tools/pd-tso-bench/README.md b/tools/pd-tso-bench/README.md index 5eb94390546a..d1ec37364062 100644 --- a/tools/pd-tso-bench/README.md +++ b/tools/pd-tso-bench/README.md @@ -24,8 +24,6 @@ This section describes how to benchmark the GetTS performance. the number of pd clients involved in each benchmark (default 1) -count int the count number that the test will run (default 1) --dc string - which dc-location this bench will request (default "global") -duration duration how many seconds the test will last (default 1m0s) -interval duration