Skip to content

Commit

Permalink
tso/local: remove local tso completely (#8864)
Browse files Browse the repository at this point in the history
close #8802

Signed-off-by: okJiang <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
okJiang and ti-chi-bot[bot] authored Dec 11, 2024
1 parent fbfcdb8 commit 5d62787
Show file tree
Hide file tree
Showing 20 changed files with 53 additions and 189 deletions.
16 changes: 8 additions & 8 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,10 +509,10 @@ func (c *client) GetTSAsync(ctx context.Context) tso.TSFuture {
return c.inner.dispatchTSORequestWithRetry(ctx)
}

// GetLocalTSAsync implements the TSOClient interface.
//
// Deprecated: Local TSO will be completely removed in the future. Currently, regardless of the
// parameters passed in, this method will default to returning the global TSO.
// Deprecated: the Local TSO feature has been deprecated. Regardless of the
// parameters passed, the behavior of this interface will be equivalent to
// `GetTSAsync`. If you want to use a separately deployed TSO service,
// please refer to the deployment of the TSO microservice.
func (c *client) GetLocalTSAsync(ctx context.Context, _ string) tso.TSFuture {
return c.GetTSAsync(ctx)
}
Expand All @@ -523,10 +523,10 @@ func (c *client) GetTS(ctx context.Context) (physical int64, logical int64, err
return resp.Wait()
}

// GetLocalTS implements the TSOClient interface.
//
// Deprecated: Local TSO will be completely removed in the future. Currently, regardless of the
// parameters passed in, this method will default to returning the global TSO.
// Deprecated: the Local TSO feature has been deprecated. Regardless of the
// parameters passed, the behavior of this interface will be equivalent to
// `GetTS`. If you want to use a separately deployed TSO service,
// please refer to the deployment of the TSO microservice.
func (c *client) GetLocalTS(ctx context.Context, _ string) (physical int64, logical int64, err error) {
return c.GetTS(ctx)
}
Expand Down
16 changes: 8 additions & 8 deletions client/clients/tso/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ type Client interface {
// the TSO microservice.
GetMinTS(ctx context.Context) (int64, int64, error)

// GetLocalTS gets a local timestamp from PD or TSO microservice.
//
// Deprecated: Local TSO will be completely removed in the future. Currently, regardless of the
// parameters passed in, this method will default to returning the global TSO.
// Deprecated: the Local TSO feature has been deprecated. Regardless of the
// parameters passed, the behavior of this interface will be equivalent to
// `GetTS`. If you want to use a separately deployed TSO service,
// please refer to the deployment of the TSO microservice.
GetLocalTS(ctx context.Context, _ string) (int64, int64, error)
// GetLocalTSAsync gets a local timestamp from PD or TSO microservice, without block the caller.
//
// Deprecated: Local TSO will be completely removed in the future. Currently, regardless of the
// parameters passed in, this method will default to returning the global TSO.
// Deprecated: the Local TSO feature has been deprecated. Regardless of the
// parameters passed, the behavior of this interface will be equivalent to
// `GetTSAsync`. If you want to use a separately deployed TSO service,
// please refer to the deployment of the TSO microservice.
GetLocalTSAsync(ctx context.Context, _ string) TSFuture
}

Expand Down
5 changes: 1 addition & 4 deletions pkg/mcs/tso/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,7 @@ type Config struct {
// the primary/leader again. Etcd only supports seconds TTL, so here is second too.
LeaderLease int64 `toml:"lease" json:"lease"`

// EnableLocalTSO is used to enable the Local TSO Allocator feature,
// which allows the PD server to generate Local TSO for certain DC-level transactions.
// To make this feature meaningful, user has to set the "zone" label for the PD server
// to indicate which DC this PD belongs to.
// Deprecated
EnableLocalTSO bool `toml:"enable-local-tso" json:"enable-local-tso"`

// TSOSaveInterval is the interval to save timestamp.
Expand Down
5 changes: 0 additions & 5 deletions pkg/mcs/tso/server/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ func TestConfigBasic(t *testing.T) {
re.Equal(defaultBackendEndpoints, cfg.BackendEndpoints)
re.Equal(defaultListenAddr, cfg.ListenAddr)
re.Equal(constant.DefaultLeaderLease, cfg.LeaderLease)
re.False(cfg.EnableLocalTSO)
re.True(cfg.EnableGRPCGateway)
re.Equal(defaultTSOSaveInterval, cfg.TSOSaveInterval.Duration)
re.Equal(defaultTSOUpdatePhysicalInterval, cfg.TSOUpdatePhysicalInterval.Duration)
Expand All @@ -48,7 +47,6 @@ func TestConfigBasic(t *testing.T) {
cfg.ListenAddr = "test-listen-addr"
cfg.AdvertiseListenAddr = "test-advertise-listen-addr"
cfg.LeaderLease = 123
cfg.EnableLocalTSO = true
cfg.TSOSaveInterval.Duration = time.Duration(10) * time.Second
cfg.TSOUpdatePhysicalInterval.Duration = time.Duration(100) * time.Millisecond
cfg.MaxResetTSGap.Duration = time.Duration(1) * time.Hour
Expand All @@ -58,7 +56,6 @@ func TestConfigBasic(t *testing.T) {
re.Equal("test-listen-addr", cfg.GetListenAddr())
re.Equal("test-advertise-listen-addr", cfg.GetAdvertiseListenAddr())
re.Equal(int64(123), cfg.GetLeaderLease())
re.True(cfg.EnableLocalTSO)
re.Equal(time.Duration(10)*time.Second, cfg.TSOSaveInterval.Duration)
re.Equal(time.Duration(100)*time.Millisecond, cfg.TSOUpdatePhysicalInterval.Duration)
re.Equal(time.Duration(1)*time.Hour, cfg.MaxResetTSGap.Duration)
Expand All @@ -74,7 +71,6 @@ name = "tso-test-name"
data-dir = "/var/lib/tso"
enable-grpc-gateway = false
lease = 123
enable-local-tso = true
tso-save-interval = "10s"
tso-update-physical-interval = "100ms"
max-gap-reset-ts = "1h"
Expand All @@ -92,7 +88,6 @@ max-gap-reset-ts = "1h"
re.Equal("test-advertise-listen-addr", cfg.GetAdvertiseListenAddr())
re.Equal("/var/lib/tso", cfg.DataDir)
re.Equal(int64(123), cfg.GetLeaderLease())
re.True(cfg.EnableLocalTSO)
re.Equal(time.Duration(10)*time.Second, cfg.TSOSaveInterval.Duration)
re.Equal(time.Duration(100)*time.Millisecond, cfg.TSOUpdatePhysicalInterval.Duration)
re.Equal(time.Duration(1)*time.Hour, cfg.MaxResetTSGap.Duration)
Expand Down
9 changes: 0 additions & 9 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,15 +272,6 @@ func (s *Server) GetTSOAllocatorManager(keyspaceGroupID uint32) (*tso.AllocatorM
return s.keyspaceGroupManager.GetAllocatorManager(keyspaceGroupID)
}

// IsLocalRequest checks if the forwarded host is the current host
func (*Server) IsLocalRequest(forwardedHost string) bool {
// TODO: Check if the forwarded host is the current host.
// The logic is depending on etcd service mode -- if the TSO service
// uses the embedded etcd, check against ClientUrls; otherwise check
// against the cluster membership.
return forwardedHost == ""
}

// ValidateInternalRequest checks if server is closed, which is used to validate
// the gRPC communication between TSO servers internally.
// TODO: Check if the sender is from the global TSO allocator
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/endpoint/service_middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type ServiceMiddlewareStorage interface {

var _ ServiceMiddlewareStorage = (*StorageEndpoint)(nil)

// LoadServiceMiddlewareConfig loads service middleware config from keypath.KeyspaceGroupLocalTSPath then unmarshal it to cfg.
// LoadServiceMiddlewareConfig loads service middleware config from ServiceMiddlewarePath then unmarshal it to cfg.
func (se *StorageEndpoint) LoadServiceMiddlewareConfig(cfg any) (bool, error) {
value, err := se.Load(keypath.ServiceMiddlewarePath)
if err != nil || value == "" {
Expand All @@ -42,7 +42,7 @@ func (se *StorageEndpoint) LoadServiceMiddlewareConfig(cfg any) (bool, error) {
return true, nil
}

// SaveServiceMiddlewareConfig stores marshallable cfg to the keypath.KeyspaceGroupLocalTSPath.
// SaveServiceMiddlewareConfig stores marshallable cfg to the ServiceMiddlewarePath.
func (se *StorageEndpoint) SaveServiceMiddlewareConfig(cfg any) error {
return se.saveJSON(keypath.ServiceMiddlewarePath, cfg)
}
6 changes: 3 additions & 3 deletions pkg/storage/endpoint/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ type TSOStorage interface {

var _ TSOStorage = (*StorageEndpoint)(nil)

// LoadTimestamp will get all time windows of Local/Global TSOs from etcd and return the biggest one.
// For the Global TSO, loadTimestamp will get all Local and Global TSO time windows persisted in etcd and choose the biggest one.
// For the Local TSO, loadTimestamp will only get its own dc-location time window persisted before.
// LoadTimestamp will get all time windows of Global TSOs from etcd and return the biggest one.
// TODO: Due to local TSO is deprecated, maybe we do not need to load timestamp
// by prefix, we can just load the timestamp by the key.
func (se *StorageEndpoint) LoadTimestamp(prefix string) (time.Time, error) {
prefixEnd := clientv3.GetPrefixRangeEnd(prefix)
keys, values, err := se.LoadRange(prefix, prefixEnd, 0)
Expand Down
30 changes: 0 additions & 30 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ package tso

import (
"context"
"math"
"path"
"runtime/trace"
"strconv"
"sync"
Expand All @@ -43,8 +41,6 @@ const (
checkStep = time.Minute
patrolStep = time.Second
defaultAllocatorLeaderLease = 3
localTSOAllocatorEtcdPrefix = "lta"
localTSOSuffixEtcdPrefix = "lts"
)

var (
Expand Down Expand Up @@ -217,17 +213,6 @@ func (am *AllocatorManager) getGroupIDStr() string {
return strconv.FormatUint(uint64(am.kgID), 10)
}

// GetTimestampPath returns the timestamp path in etcd.
func (am *AllocatorManager) GetTimestampPath() string {
if am == nil {
return ""
}

am.mu.RLock()
defer am.mu.RUnlock()
return path.Join(am.rootPath, am.mu.allocatorGroup.allocator.GetTimestampPath())
}

// tsoAllocatorLoop is used to run the TSO Allocator updating daemon.
func (am *AllocatorManager) tsoAllocatorLoop() {
defer logutil.LogPanic()
Expand All @@ -254,21 +239,6 @@ func (am *AllocatorManager) GetMember() ElectionMember {
return am.member
}

// GetSuffixBits calculates the bits of suffix sign
// by the max number of suffix so far,
// which will be used in the TSO logical part.
func (am *AllocatorManager) GetSuffixBits() int {
am.mu.RLock()
defer am.mu.RUnlock()
return CalSuffixBits(am.mu.maxSuffix)
}

// CalSuffixBits calculates the bits of suffix by the max suffix sign.
func CalSuffixBits(maxSuffix int32) int {
// maxSuffix + 1 because we have the Global TSO holds 0 as the suffix sign
return int(math.Ceil(math.Log2(float64(maxSuffix + 1))))
}

// AllocatorDaemon is used to update every allocator's TSO and check whether we have
// any new local allocator that needs to be set up.
func (am *AllocatorManager) AllocatorDaemon(ctx context.Context) {
Expand Down
22 changes: 3 additions & 19 deletions pkg/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,6 @@ type Allocator interface {
IsInitialize() bool
// UpdateTSO is used to update the TSO in memory and the time window in etcd.
UpdateTSO() error
// GetTimestampPath returns the timestamp path in etcd, which is:
// 1. for the default keyspace group:
// a. timestamp in /pd/{cluster_id}/timestamp
// b. lta/{dc-location}/timestamp in /pd/{cluster_id}/lta/{dc-location}/timestamp
// 1. for the non-default keyspace groups:
// a. {group}/gts/timestamp in /ms/{cluster_id}/tso/{group}/gta/timestamp
// b. {group}/lts/{dc-location}/timestamp in /ms/{cluster_id}/tso/{group}/lta/{dc-location}/timestamp
GetTimestampPath() string
// SetTSO sets the physical part with given TSO. It's mainly used for BR restore.
// Cannot set the TSO smaller than now in any case.
// if ignoreSmaller=true, if input ts is smaller than current, ignore silently, else return error
Expand All @@ -68,6 +60,8 @@ type Allocator interface {
}

// GlobalTSOAllocator is the global single point TSO allocator.
// TODO: Local TSO allocator is deprecated now, we can update the name to
// TSOAllocator and remove the `Global` concept.
type GlobalTSOAllocator struct {
ctx context.Context
cancel context.CancelFunc
Expand Down Expand Up @@ -132,19 +126,9 @@ func (gta *GlobalTSOAllocator) getGroupID() uint32 {
return gta.am.getGroupID()
}

// GetTimestampPath returns the timestamp path in etcd.
func (gta *GlobalTSOAllocator) GetTimestampPath() string {
if gta == nil || gta.timestampOracle == nil {
return ""
}
return gta.timestampOracle.GetTimestampPath()
}

// Initialize will initialize the created global TSO allocator.
func (gta *GlobalTSOAllocator) Initialize(int) error {
gta.tsoAllocatorRoleGauge.Set(1)
// The suffix of a Global TSO should always be 0.
gta.timestampOracle.suffix = 0
return gta.timestampOracle.SyncTimestamp()
}

Expand Down Expand Up @@ -175,7 +159,7 @@ func (gta *GlobalTSOAllocator) GenerateTSO(ctx context.Context, count uint32) (p
return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs(fmt.Sprintf("requested pd %s of cluster", errs.NotLeaderErr))
}

return gta.timestampOracle.getTS(ctx, gta.member.GetLeadership(), count, 0)
return gta.timestampOracle.getTS(ctx, gta.member.GetLeadership(), count)
}

// Reset is used to reset the TSO allocator.
Expand Down
6 changes: 2 additions & 4 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ const (
type state struct {
syncutil.RWMutex
// ams stores the allocator managers of the keyspace groups. Each keyspace group is
// assigned with an allocator manager managing its global/local tso allocators.
// assigned with an allocator manager managing its global tso allocators.
// Use a fixed size array to maximize the efficiency of concurrent access to
// different keyspace groups for tso service.
ams [constant.MaxKeyspaceGroupCountInUse]*AllocatorManager
Expand Down Expand Up @@ -790,8 +790,7 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro
am := NewAllocatorManager(kgm.ctx, group.ID, participant, tsRootPath, storage, kgm.cfg)
am.startGlobalAllocatorLoop()
log.Info("created allocator manager",
zap.Uint32("keyspace-group-id", group.ID),
zap.String("timestamp-path", am.GetTimestampPath()))
zap.Uint32("keyspace-group-id", group.ID))
kgm.Lock()
group.KeyspaceLookupTable = make(map[uint32]struct{})
for _, kid := range group.Keyspaces {
Expand Down Expand Up @@ -1517,7 +1516,6 @@ func (kgm *KeyspaceGroupManager) deletedGroupCleaner() {
log.Info("delete the keyspace group tso key",
zap.Uint32("keyspace-group-id", groupID))
// Clean up the remaining TSO keys.
// TODO: support the Local TSO Allocator clean up.
err := kgm.tsoSvcStorage.DeleteTimestamp(
keypath.TimestampPath(
keypath.KeyspaceGroupGlobalTSPath(groupID),
Expand Down
1 change: 0 additions & 1 deletion pkg/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ func (suite *keyspaceGroupManagerTestSuite) createConfig() *TestServiceConfig {
ListenAddr: addr,
AdvertiseListenAddr: addr,
LeaderLease: constant.DefaultLeaderLease,
LocalTSOEnabled: false,
TSOUpdatePhysicalInterval: 50 * time.Millisecond,
TSOSaveInterval: time.Duration(constant.DefaultLeaderLease) * time.Second,
MaxResetTSGap: time.Hour * 24,
Expand Down
1 change: 0 additions & 1 deletion pkg/tso/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ type TestServiceConfig struct {
ListenAddr string // Address the service listens on.
AdvertiseListenAddr string // Address the service advertises to the clients.
LeaderLease int64 // Leader lease.
LocalTSOEnabled bool // Whether local TSO is enabled.
TSOUpdatePhysicalInterval time.Duration // Interval to update TSO in physical storage.
TSOSaveInterval time.Duration // Interval to save TSO to physical storage.
MaxResetTSGap time.Duration // Maximum gap to reset TSO.
Expand Down
Loading

0 comments on commit 5d62787

Please sign in to comment.