Skip to content

Commit

Permalink
Remove the suffix bits
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed Nov 12, 2024
1 parent 30ac85a commit 1608b2d
Show file tree
Hide file tree
Showing 10 changed files with 30 additions and 40 deletions.
8 changes: 3 additions & 5 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/utils/tlsutil"
"github.com/tikv/pd/client/utils/tsoutil"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -774,9 +773,8 @@ func (c *client) GetTS(ctx context.Context) (physical int64, logical int64, err
}

// GetLocalTS implements the TSOClient interface.
func (c *client) GetLocalTS(ctx context.Context, dcLocation string) (physical int64, logical int64, err error) {
resp := c.GetLocalTSAsync(ctx, dcLocation)
return resp.Wait()
func (c *client) GetLocalTS(ctx context.Context, _ string) (physical int64, logical int64, err error) {
return c.GetTS(ctx)

Check warning on line 777 in client/client.go

View check run for this annotation

Codecov / codecov/patch

client/client.go#L776-L777

Added lines #L776 - L777 were not covered by tests
}

// GetMinTS implements the TSOClient interface.
Expand Down Expand Up @@ -822,7 +820,7 @@ func (c *client) GetMinTS(ctx context.Context) (physical int64, logical int64, e
}

minTS := resp.GetTimestamp()
return minTS.Physical, tsoutil.AddLogical(minTS.Logical, 0, minTS.SuffixBits), nil
return minTS.Physical, minTS.Logical, nil
}

func handleRegionResponse(res *pdpb.GetRegionResponse) *Region {
Expand Down
3 changes: 2 additions & 1 deletion client/pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,8 +384,9 @@ func (c *pdServiceBalancer) get() (ret ServiceClient) {
type updateKeyspaceIDFunc func() error
type tsoLeaderURLUpdatedFunc func(string) error

// tsoEventSource subscribes to events related to changes in the TSO leader/primary from the service discovery.
type tsoEventSource interface {
// SetTSOLeaderURLUpdatedCallback adds a callback which will be called when the TSO leader is updated.
// SetTSOLeaderURLUpdatedCallback adds a callback which will be called when the TSO leader/primary is updated.
SetTSOLeaderURLUpdatedCallback(callback tsoLeaderURLUpdatedFunc)
}

Expand Down
22 changes: 13 additions & 9 deletions client/tso_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,12 @@ type TSOClient interface {
GetMinTS(ctx context.Context) (int64, int64, error)

// GetLocalTS gets a local timestamp from PD or TSO microservice.
// Deprecated: Local TSO will be completely removed in the future.
// Deprecated: Local TSO will be completely removed in the future. Currently, regardless of the
// parameters passed in, this method will default to returning the global TSO.
GetLocalTS(ctx context.Context, _ string) (int64, int64, error)
// GetLocalTSAsync gets a local timestamp from PD or TSO microservice, without block the caller.
// Deprecated: Local TSO will be completely removed in the future.
// Deprecated: Local TSO will be completely removed in the future. Currently, regardless of the
// parameters passed in, this method will default to returning the global TSO.
GetLocalTSAsync(ctx context.Context, _ string) TSFuture
}

Expand Down Expand Up @@ -155,7 +157,7 @@ func (c *tsoClient) getTSORequest(ctx context.Context) *tsoRequest {
return req
}

func (c *tsoClient) getTSOLeaderURL() string {
func (c *tsoClient) getLeaderURL() string {
url := c.leaderURL.Load()
if url == nil {
return ""

Check warning on line 163 in client/tso_client.go

View check run for this annotation

Codecov / codecov/patch

client/tso_client.go#L163

Added line #L163 was not covered by tests
Expand All @@ -165,7 +167,7 @@ func (c *tsoClient) getTSOLeaderURL() string {

// getTSOLeaderClientConn returns the TSO leader gRPC client connection.
func (c *tsoClient) getTSOLeaderClientConn() (*grpc.ClientConn, string) {
url := c.getTSOLeaderURL()
url := c.getLeaderURL()
if len(url) == 0 {
log.Fatal("[tso] the tso leader should exist")

Check warning on line 172 in client/tso_client.go

View check run for this annotation

Codecov / codecov/patch

client/tso_client.go#L172

Added line #L172 was not covered by tests
}
Expand Down Expand Up @@ -228,7 +230,7 @@ type tsoConnectionContext struct {
func (c *tsoClient) updateConnectionCtxs(ctx context.Context, connectionCtxs *sync.Map) bool {
// Normal connection creating, it will be affected by the `enableForwarding`.
createTSOConnection := c.tryConnectToTSO
if c.option.enableForwarding {
if c.option.getEnableTSOFollowerProxy() {
createTSOConnection = c.tryConnectToTSOWithProxy
}
if err := createTSOConnection(ctx, connectionCtxs); err != nil {
Expand Down Expand Up @@ -317,7 +319,7 @@ func (c *tsoClient) tryConnectToTSO(
backupClientConn, backupURL := c.backupClientConn()
if backupClientConn != nil {
log.Info("[tso] fall back to use follower to forward tso stream", zap.String("follower-url", backupURL))
forwardedHost := c.getTSOLeaderURL()
forwardedHost := c.getLeaderURL()
if len(forwardedHost) == 0 {
return errors.Errorf("cannot find the tso leader")

Check warning on line 324 in client/tso_client.go

View check run for this annotation

Codecov / codecov/patch

client/tso_client.go#L324

Added line #L324 was not covered by tests
}
Expand Down Expand Up @@ -401,7 +403,7 @@ func (c *tsoClient) tryConnectToTSOWithProxy(
) error {
tsoStreamBuilders := c.getAllTSOStreamBuilders()
leaderAddr := c.svcDiscovery.GetServingURL()
forwardedHost := c.getTSOLeaderURL()
forwardedHost := c.getLeaderURL()
if len(forwardedHost) == 0 {
return errors.Errorf("cannot find the tso leader")

Check warning on line 408 in client/tso_client.go

View check run for this annotation

Codecov / codecov/patch

client/tso_client.go#L408

Added line #L408 was not covered by tests
}
Expand All @@ -418,7 +420,8 @@ func (c *tsoClient) tryConnectToTSOWithProxy(
})
// Update the missing one.
for addr, tsoStreamBuilder := range tsoStreamBuilders {
if _, ok := connectionCtxs.Load(addr); ok {
_, ok := connectionCtxs.Load(addr)
if ok {
continue
}
log.Info("[tso] try to create tso stream", zap.String("addr", addr))
Expand Down Expand Up @@ -473,13 +476,14 @@ func (c *tsoClient) getAllTSOStreamBuilders() map[string]tsoStreamBuilder {
return streamBuilders
}

// tryCreateTSODispatcher will try to create the TSO dispatcher if it is not created yet.
func (c *tsoClient) tryCreateTSODispatcher() {
// The dispatcher is already created.
if c.getDispatcher() != nil {
return
}
// The TSO leader is not ready.
url := c.getTSOLeaderURL()
url := c.getLeaderURL()
if len(url) == 0 {
return
}

Check warning on line 489 in client/tso_client.go

View check run for this annotation

Codecov / codecov/patch

client/tso_client.go#L488-L489

Added lines #L488 - L489 were not covered by tests
Expand Down
18 changes: 9 additions & 9 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func newTSODispatcher(
New: func() any {
return newBatchController[*tsoRequest](
maxBatchSize*2,
tsoRequestFinisher(0, 0, 0, invalidStreamID),
tsoRequestFinisher(0, 0, invalidStreamID),
tsoBestBatchSize,
)
},
Expand Down Expand Up @@ -459,7 +459,7 @@ func (td *tsoDispatcher) handleProcessRequestError(ctx context.Context, bo *retr
return true
}

// updateConnectionCtxs updates the `connectionCtxs` for the specified DC location regularly.
// updateConnectionCtxs updates the `connectionCtxs` regularly.
func (td *tsoDispatcher) connectionCtxsUpdater() {
var (
ctx = td.ctx
Expand Down Expand Up @@ -583,10 +583,10 @@ func (td *tsoDispatcher) processRequests(
sourceStreamID: stream.streamID,
}
// `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request.
firstLogical := tsoutil.AddLogical(result.logical, -int64(result.count)+1, result.suffixBits)
firstLogical := result.logical - int64(result.count) + 1
// Do the check before releasing the token.
td.checkMonotonicity(tsoInfoBeforeReq, curTSOInfo, firstLogical)
td.doneCollectedRequests(tbc, result.physical, firstLogical, result.suffixBits, stream.streamID)
td.doneCollectedRequests(tbc, result.physical, firstLogical, stream.streamID)
}

err := stream.processRequests(
Expand All @@ -601,11 +601,11 @@ func (td *tsoDispatcher) processRequests(
return nil
}

func tsoRequestFinisher(physical, firstLogical int64, suffixBits uint32, streamID string) finisherFunc[*tsoRequest] {
func tsoRequestFinisher(physical, firstLogical int64, streamID string) finisherFunc[*tsoRequest] {
return func(idx int, tsoReq *tsoRequest, err error) {
// Retrieve the request context before the request is done to trace without race.
requestCtx := tsoReq.requestCtx
tsoReq.physical, tsoReq.logical = physical, tsoutil.AddLogical(firstLogical, int64(idx), suffixBits)
tsoReq.physical, tsoReq.logical = physical, firstLogical+int64(idx)
tsoReq.streamID = streamID
tsoReq.tryDone(err)
trace.StartRegion(requestCtx, "pdclient.tsoReqDequeue").End()
Expand All @@ -614,12 +614,12 @@ func tsoRequestFinisher(physical, firstLogical int64, suffixBits uint32, streamI

func (td *tsoDispatcher) cancelCollectedRequests(tbc *batchController[*tsoRequest], streamID string, err error) {
td.tokenCh <- struct{}{}
tbc.finishCollectedRequests(tsoRequestFinisher(0, 0, 0, streamID), err)
tbc.finishCollectedRequests(tsoRequestFinisher(0, 0, streamID), err)
}

func (td *tsoDispatcher) doneCollectedRequests(tbc *batchController[*tsoRequest], physical, firstLogical int64, suffixBits uint32, streamID string) {
func (td *tsoDispatcher) doneCollectedRequests(tbc *batchController[*tsoRequest], physical, firstLogical int64, streamID string) {
td.tokenCh <- struct{}{}
tbc.finishCollectedRequests(tsoRequestFinisher(physical, firstLogical, suffixBits, streamID), nil)
tbc.finishCollectedRequests(tsoRequestFinisher(physical, firstLogical, streamID), nil)
}

// checkMonotonicity checks whether the monotonicity of the TSO allocation is violated.
Expand Down
3 changes: 0 additions & 3 deletions client/tso_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ func checkStreamTimeout(ctx context.Context, cancel context.CancelFunc, done cha
type tsoRequestResult struct {
physical, logical int64
count uint32
suffixBits uint32
respKeyspaceGroupID uint32
}

Expand Down Expand Up @@ -144,7 +143,6 @@ func (s pdTSOStreamAdapter) Recv() (tsoRequestResult, error) {
physical: resp.GetTimestamp().GetPhysical(),
logical: resp.GetTimestamp().GetLogical(),
count: resp.GetCount(),
suffixBits: resp.GetTimestamp().GetSuffixBits(),
respKeyspaceGroupID: defaultKeySpaceGroupID,
}, nil
}
Expand Down Expand Up @@ -176,7 +174,6 @@ func (s tsoTSOStreamAdapter) Recv() (tsoRequestResult, error) {
physical: resp.GetTimestamp().GetPhysical(),
logical: resp.GetTimestamp().GetLogical(),
count: resp.GetCount(),
suffixBits: resp.GetTimestamp().GetSuffixBits(),
respKeyspaceGroupID: resp.GetHeader().GetKeyspaceGroupId(),
}, nil
}
Expand Down
3 changes: 0 additions & 3 deletions client/tso_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,6 @@ func (s *mockTSOStreamImpl) autoGenResult(count int64) resultMsg {
physical: s.resGenPhysical,
logical: s.resGenLogical,
count: uint32(count),
suffixBits: 0,
respKeyspaceGroupID: 0,
},
}
Expand All @@ -225,7 +224,6 @@ func (s *mockTSOStreamImpl) returnResult(physical int64, logical int64, count ui
physical: physical,
logical: logical,
count: count,
suffixBits: 0,
respKeyspaceGroupID: s.keyspaceID,
},
}
Expand Down Expand Up @@ -308,7 +306,6 @@ func (s *testTSOStreamSuite) processRequestWithResultCh(count int64) (<-chan cal
err := s.stream.processRequests(1, 2, 3, count, time.Now(), func(result tsoRequestResult, reqKeyspaceGroupID uint32, err error) {
if err == nil {
s.re.Equal(uint32(3), reqKeyspaceGroupID)
s.re.Equal(uint32(0), result.suffixBits)
}
ch <- callbackInvocation{
result: result,
Expand Down
5 changes: 0 additions & 5 deletions client/utils/tsoutil/tsoutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,6 @@ import (
"github.com/pingcap/kvproto/pkg/pdpb"
)

// AddLogical shifts the count before we add it to the logical part.
func AddLogical(logical, count int64, suffixBits uint32) int64 {
return logical + count<<suffixBits
}

// TSLessEqual returns true if (physical, logical) <= (thatPhysical, thatLogical).
func TSLessEqual(physical, logical, thatPhysical, thatLogical int64) bool {
if physical == thatPhysical {
Expand Down
4 changes: 2 additions & 2 deletions tests/integrations/mcs/tso/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,8 @@ func (s *tsoProxyTestSuite) verifyTSOProxy(
re.Equal(req.GetCount(), resp.GetCount())
ts := resp.GetTimestamp()
count := int64(resp.GetCount())
physical, largestLogic, suffixBits := ts.GetPhysical(), ts.GetLogical(), ts.GetSuffixBits()
firstLogical := tsoutil.AddLogical(largestLogic, -count+1, suffixBits)
physical, largestLogic := ts.GetPhysical(), ts.GetLogical()
firstLogical := largestLogic - count + 1
re.False(tsoutil.TSLessEqual(physical, firstLogical, lastPhysical, lastLogical))
}
}(i)
Expand Down
2 changes: 1 addition & 1 deletion tests/integrations/tso/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,6 @@ func checkAndReturnTimestampResponse[T tsoResponse](re *require.Assertions, resp
re.Equal(uint32(tsoCount), resp.GetCount())
timestamp := resp.GetTimestamp()
re.Positive(timestamp.GetPhysical())
re.GreaterOrEqual(uint32(timestamp.GetLogical())>>timestamp.GetSuffixBits(), uint32(tsoCount))
re.GreaterOrEqual(uint32(timestamp.GetLogical()), uint32(tsoCount))
return timestamp
}
2 changes: 0 additions & 2 deletions tools/pd-tso-bench/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ This section describes how to benchmark the GetTS performance.
the number of pd clients involved in each benchmark (default 1)
-count int
the count number that the test will run (default 1)
-dc string
which dc-location this bench will request (default "global")
-duration duration
how many seconds the test will last (default 1m0s)
-interval duration
Expand Down

0 comments on commit 1608b2d

Please sign in to comment.