diff --git a/.github/workflows/tso-consistency-test.yaml b/.github/workflows/tso-consistency-test.yaml deleted file mode 100644 index 5fb0fd70552..00000000000 --- a/.github/workflows/tso-consistency-test.yaml +++ /dev/null @@ -1,17 +0,0 @@ -name: TSO Consistency Test -on: - # Only run when the new code is merged into master. - push: - branches: - - master -jobs: - tso-consistency-test: - runs-on: ubuntu-latest - steps: - - name: Checkout code - uses: actions/checkout@v4 - - uses: actions/setup-go@v5 - with: - go-version: '1.23' - - name: Make TSO Consistency Test - run: make test-tso-consistency diff --git a/Makefile b/Makefile index 51a1b04b71c..f7f1cba1102 100644 --- a/Makefile +++ b/Makefile @@ -243,7 +243,7 @@ SUBMODULES := $(filter $(shell find . -iname "go.mod" -exec dirname {} \;),\ test: install-tools # testing all pkgs... @$(FAILPOINT_ENABLE) - CGO_ENABLED=1 go test -tags tso_function_test,deadlock -timeout 20m -race -cover $(TEST_PKGS) || { $(FAILPOINT_DISABLE); exit 1; } + CGO_ENABLED=1 go test -tags deadlock -timeout 20m -race -cover $(TEST_PKGS) || { $(FAILPOINT_DISABLE); exit 1; } @$(FAILPOINT_DISABLE) basic-test: install-tools @@ -257,24 +257,12 @@ ci-test-job: install-tools dashboard-ui pd-ut ./scripts/ci-subtask.sh $(JOB_INDEX) || { $(FAILPOINT_DISABLE); exit 1; } @$(FAILPOINT_DISABLE) -TSO_INTEGRATION_TEST_PKGS := $(PD_PKG)/tests/server/tso - -test-tso: install-tools - # testing TSO function & consistency... - @$(FAILPOINT_ENABLE) - CGO_ENABLED=1 go test -race -tags without_dashboard,tso_full_test,deadlock $(TSO_INTEGRATION_TEST_PKGS) || { $(FAILPOINT_DISABLE); exit 1; } - @$(FAILPOINT_DISABLE) +TSO_FUNCTION_TEST_PKGS := $(PD_PKG)/tests/server/tso test-tso-function: install-tools # testing TSO function... @$(FAILPOINT_ENABLE) - CGO_ENABLED=1 go test -race -tags without_dashboard,tso_function_test,deadlock $(TSO_INTEGRATION_TEST_PKGS) || { $(FAILPOINT_DISABLE); exit 1; } - @$(FAILPOINT_DISABLE) - -test-tso-consistency: install-tools - # testing TSO consistency... - @$(FAILPOINT_ENABLE) - CGO_ENABLED=1 go test -race -tags without_dashboard,tso_consistency_test,deadlock $(TSO_INTEGRATION_TEST_PKGS) || { $(FAILPOINT_DISABLE); exit 1; } + CGO_ENABLED=1 go test -race -tags without_dashboard,deadlock $(TSO_FUNCTION_TEST_PKGS) || { $(FAILPOINT_DISABLE); exit 1; } @$(FAILPOINT_DISABLE) REAL_CLUSTER_TEST_PATH := $(ROOT_PATH)/tests/integrations/realcluster @@ -302,7 +290,7 @@ test-with-cover-parallel: install-tools dashboard-ui split split: # todo: it will remove server/api,/tests and tso packages after daily CI integrate all verify CI. - go list ./... | grep -v -E "github.com/tikv/pd/server/api|github.com/tikv/pd/tests/client|github.com/tikv/pd/tests/server/tso" > packages.list;\ + go list ./... | grep -v -E "github.com/tikv/pd/server/api|github.com/tikv/pd/tests/client|$(TSO_FUNCTION_TEST_PKGS)" > packages.list;\ split packages.list -n r/${TASK_COUNT} packages_unit_ -a 1 --numeric-suffixes=1;\ cat packages_unit_${TASK_ID} |tr "\n" " " >package.list;\ rm packages*; diff --git a/pd.code-workspace b/pd.code-workspace index c5767e86b22..28722d4fa61 100644 --- a/pd.code-workspace +++ b/pd.code-workspace @@ -17,5 +17,7 @@ "path": "tools" } ], - "settings": {} + "settings": { + "makefile.configureOnOpen": false + } } \ No newline at end of file diff --git a/pkg/storage/storage_tso_test.go b/pkg/storage/storage_tso_test.go index 15a17e6a41d..07ea79df47c 100644 --- a/pkg/storage/storage_tso_test.go +++ b/pkg/storage/storage_tso_test.go @@ -37,34 +37,6 @@ func TestSaveLoadTimestamp(t *testing.T) { re.Equal(expectedTS, ts) } -func TestGlobalLocalTimestamp(t *testing.T) { - re := require.New(t) - storage, clean := newTestStorage(t) - defer clean() - ltaKey := "lta" - dc1LocationKey, dc2LocationKey := "dc1", "dc2" - localTS1 := time.Now().Round(0) - l1 := path.Join(ltaKey, dc1LocationKey, keypath.TimestampKey) - l2 := path.Join(ltaKey, dc2LocationKey, keypath.TimestampKey) - - err := storage.SaveTimestamp(l1, localTS1) - re.NoError(err) - globalTS := time.Now().Round(0) - err = storage.SaveTimestamp(keypath.TimestampKey, globalTS) - re.NoError(err) - localTS2 := time.Now().Round(0) - err = storage.SaveTimestamp(l2, localTS2) - re.NoError(err) - // return the max ts between global and local - ts, err := storage.LoadTimestamp("") - re.NoError(err) - re.Equal(localTS2, ts) - // return the local ts for a given dc location - ts, err = storage.LoadTimestamp(l1) - re.NoError(err) - re.Equal(localTS1, ts) -} - func TestTimestampTxn(t *testing.T) { re := require.New(t) storage, clean := newTestStorage(t) diff --git a/tests/server/member/member_test.go b/tests/server/member/member_test.go index 13f7fffe083..8c6ca6f76bf 100644 --- a/tests/server/member/member_test.go +++ b/tests/server/member/member_test.go @@ -47,16 +47,7 @@ func TestMemberDelete(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - dcLocationConfig := map[string]string{ - "pd1": "dc-1", - "pd2": "dc-2", - "pd3": "dc-3", - } - dcLocationNum := len(dcLocationConfig) - cluster, err := tests.NewTestCluster(ctx, dcLocationNum, func(conf *config.Config, serverName string) { - conf.EnableLocalTSO = true - conf.Labels[config.ZoneLabel] = dcLocationConfig[serverName] - }) + cluster, err := tests.NewTestCluster(ctx, 3) defer cluster.Destroy() re.NoError(err) diff --git a/tests/server/tso/allocator_test.go b/tests/server/tso/allocator_test.go deleted file mode 100644 index 257cd3b6a34..00000000000 --- a/tests/server/tso/allocator_test.go +++ /dev/null @@ -1,227 +0,0 @@ -// Copyright 2020 TiKV Project Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//go:build tso_full_test || tso_function_test -// +build tso_full_test tso_function_test - -package tso_test - -import ( - "context" - "strconv" - "sync" - "testing" - "time" - - "github.com/pingcap/kvproto/pkg/pdpb" - "github.com/stretchr/testify/require" - "github.com/tikv/pd/pkg/slice" - "github.com/tikv/pd/pkg/tso" - "github.com/tikv/pd/pkg/utils/etcdutil" - "github.com/tikv/pd/pkg/utils/testutil" - "github.com/tikv/pd/server/config" - "github.com/tikv/pd/tests" -) - -func TestAllocatorLeader(t *testing.T) { - re := require.New(t) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - // There will be three Local TSO Allocator leaders elected - dcLocationConfig := map[string]string{ - "pd2": "dc-1", - "pd4": "dc-2", - "pd6": "leader", /* Test dc-location name is same as the special key */ - } - dcLocationNum := len(dcLocationConfig) - cluster, err := tests.NewTestCluster(ctx, dcLocationNum*2, func(conf *config.Config, serverName string) { - if zoneLabel, ok := dcLocationConfig[serverName]; ok { - conf.EnableLocalTSO = true - conf.Labels[config.ZoneLabel] = zoneLabel - } - }) - re.NoError(err) - defer cluster.Destroy() - re.NoError(cluster.RunInitialServers()) - cluster.WaitAllLeaders(re, dcLocationConfig) - // To check whether we have enough Local TSO Allocator leaders - allAllocatorLeaders := make([]tso.Allocator, 0, dcLocationNum) - for _, server := range cluster.GetServers() { - // Filter out Global TSO Allocator and Local TSO Allocator followers - allocators := server.GetTSOAllocatorManager().GetAllocators( - tso.FilterDCLocation(tso.GlobalDCLocation), - tso.FilterUnavailableLeadership(), - tso.FilterUninitialized()) - // One PD server will have at most three initialized Local TSO Allocators, - // which also means three allocator leaders - re.LessOrEqual(len(allocators), dcLocationNum) - if len(allocators) == 0 { - continue - } - if len(allAllocatorLeaders) == 0 { - allAllocatorLeaders = append(allAllocatorLeaders, allocators...) - continue - } - for _, allocator := range allocators { - if slice.NoneOf(allAllocatorLeaders, func(i int) bool { return allAllocatorLeaders[i] == allocator }) { - allAllocatorLeaders = append(allAllocatorLeaders, allocator) - } - } - } - // At the end, we should have three initialized Local TSO Allocator, - // i.e., the Local TSO Allocator leaders for all dc-locations in testDCLocations - re.Len(allAllocatorLeaders, dcLocationNum) - allocatorLeaderMemberIDs := make([]uint64, 0, dcLocationNum) - for _, allocator := range allAllocatorLeaders { - allocatorLeader, _ := allocator.(*tso.LocalTSOAllocator) - allocatorLeaderMemberIDs = append(allocatorLeaderMemberIDs, allocatorLeader.GetMember().ID()) - } - for _, server := range cluster.GetServers() { - // Filter out Global TSO Allocator - allocators := server.GetTSOAllocatorManager().GetAllocators(tso.FilterDCLocation(tso.GlobalDCLocation)) - if _, ok := dcLocationConfig[server.GetServer().Name()]; !ok { - re.Empty(allocators) - continue - } - re.Len(allocators, dcLocationNum) - for _, allocator := range allocators { - allocatorFollower, _ := allocator.(*tso.LocalTSOAllocator) - allocatorFollowerMemberID := allocatorFollower.GetAllocatorLeader().GetMemberId() - re.True( - slice.AnyOf( - allocatorLeaderMemberIDs, - func(i int) bool { return allocatorLeaderMemberIDs[i] == allocatorFollowerMemberID }, - ), - ) - } - } -} - -func TestPriorityAndDifferentLocalTSO(t *testing.T) { - re := require.New(t) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - dcLocationConfig := map[string]string{ - "pd1": "dc-1", - "pd2": "dc-2", - "pd3": "dc-3", - } - dcLocationNum := len(dcLocationConfig) - cluster, err := tests.NewTestCluster(ctx, dcLocationNum, func(conf *config.Config, serverName string) { - conf.EnableLocalTSO = true - conf.Labels[config.ZoneLabel] = dcLocationConfig[serverName] - }) - defer cluster.Destroy() - re.NoError(err) - re.NoError(cluster.RunInitialServers()) - - cluster.WaitAllLeaders(re, dcLocationConfig) - leaderServer := cluster.GetLeaderServer() - re.NotNil(leaderServer) - leaderServer.BootstrapCluster() - - // Wait for all nodes becoming healthy. - time.Sleep(time.Second * 5) - - // Join a new dc-location - pd4, err := cluster.Join(ctx, func(conf *config.Config, _ string) { - conf.EnableLocalTSO = true - conf.Labels[config.ZoneLabel] = "dc-4" - }) - re.NoError(err) - re.NoError(pd4.Run()) - dcLocationConfig["pd4"] = "dc-4" - cluster.CheckClusterDCLocation() - re.NotEqual("", cluster.WaitAllocatorLeader( - "dc-4", - tests.WithRetryTimes(90), tests.WithWaitInterval(time.Second), - )) - - // Scatter the Local TSO Allocators to different servers - waitAllocatorPriorityCheck(cluster) - cluster.WaitAllLeaders(re, dcLocationConfig) - - // Before the priority is checked, we may have allocators typology like this: - // pd1: dc-1, dc-2 and dc-3 allocator leader - // pd2: None - // pd3: None - // pd4: dc-4 allocator leader - // After the priority is checked, we should have allocators typology like this: - // pd1: dc-1 allocator leader - // pd2: dc-2 allocator leader - // pd3: dc-3 allocator leader - // pd4: dc-4 allocator leader - wg := sync.WaitGroup{} - wg.Add(len(dcLocationConfig)) - for serverName, dcLocation := range dcLocationConfig { - go func(name, dc string) { - defer wg.Done() - testutil.Eventually(re, func() bool { - return cluster.WaitAllocatorLeader(dc) == name - }, testutil.WithWaitFor(90*time.Second), testutil.WithTickInterval(time.Second)) - }(serverName, dcLocation) - } - wg.Wait() - - for serverName, server := range cluster.GetServers() { - tsoAllocatorManager := server.GetTSOAllocatorManager() - localAllocatorLeaders, err := tsoAllocatorManager.GetHoldingLocalAllocatorLeaders() - re.NoError(err) - for _, localAllocatorLeader := range localAllocatorLeaders { - testTSOSuffix(re, cluster, tsoAllocatorManager, localAllocatorLeader.GetDCLocation()) - } - if serverName == cluster.GetLeader() { - testTSOSuffix(re, cluster, tsoAllocatorManager, tso.GlobalDCLocation) - } - } -} - -func waitAllocatorPriorityCheck(cluster *tests.TestCluster) { - wg := sync.WaitGroup{} - for _, server := range cluster.GetServers() { - wg.Add(1) - go func(s *tests.TestServer) { - s.GetTSOAllocatorManager().PriorityChecker() - wg.Done() - }(server) - } - wg.Wait() -} - -func testTSOSuffix(re *require.Assertions, cluster *tests.TestCluster, am *tso.AllocatorManager, dcLocation string) { - suffixBits := am.GetSuffixBits() - re.Greater(suffixBits, 0) - var suffix int64 - // The suffix of a Global TSO will always be 0 - if dcLocation != tso.GlobalDCLocation { - suffixResp, err := etcdutil.EtcdKVGet( - cluster.GetEtcdClient(), - am.GetLocalTSOSuffixPath(dcLocation)) - re.NoError(err) - re.Len(suffixResp.Kvs, 1) - suffix, err = strconv.ParseInt(string(suffixResp.Kvs[0].Value), 10, 64) - re.NoError(err) - re.GreaterOrEqual(suffixBits, tso.CalSuffixBits(int32(suffix))) - } - allocator, err := am.GetAllocator(dcLocation) - re.NoError(err) - var tso pdpb.Timestamp - testutil.Eventually(re, func() bool { - tso, err = allocator.GenerateTSO(context.Background(), 1) - re.NoError(err) - return tso.GetPhysical() != 0 - }) - // Test whether the TSO has the right suffix - re.Equal(suffix, tso.Logical&((1<>timestamp.GetSuffixBits(), req.GetCount()) - return timestamp -} - -func testGetTimestamp(re *require.Assertions, ctx context.Context, pdCli pdpb.PDClient, req *pdpb.TsoRequest) *pdpb.Timestamp { - tsoClient, err := pdCli.Tso(ctx) - re.NoError(err) - defer tsoClient.CloseSend() - re.NoError(tsoClient.Send(req)) - resp, err := tsoClient.Recv() - re.NoError(err) - return checkAndReturnTimestampResponse(re, req, resp) -} - -func TestMain(m *testing.M) { - goleak.VerifyTestMain(m, testutil.LeakOptions...) -} diff --git a/tests/server/tso/consistency_test.go b/tests/server/tso/consistency_test.go deleted file mode 100644 index c7acc69fa60..00000000000 --- a/tests/server/tso/consistency_test.go +++ /dev/null @@ -1,344 +0,0 @@ -// Copyright 2021 TiKV Project Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//go:build tso_full_test || tso_consistency_test -// +build tso_full_test tso_consistency_test - -package tso_test - -import ( - "context" - "sync" - "testing" - "time" - - "github.com/pingcap/failpoint" - "github.com/pingcap/kvproto/pkg/pdpb" - "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" - "github.com/tikv/pd/pkg/tso" - "github.com/tikv/pd/pkg/utils/grpcutil" - "github.com/tikv/pd/pkg/utils/syncutil" - "github.com/tikv/pd/pkg/utils/testutil" - "github.com/tikv/pd/pkg/utils/tsoutil" - "github.com/tikv/pd/server/config" - "github.com/tikv/pd/tests" -) - -type tsoConsistencyTestSuite struct { - suite.Suite - ctx context.Context - cancel context.CancelFunc - - leaderServer *tests.TestServer - dcClientMap map[string]pdpb.PDClient - - tsPoolMutex syncutil.Mutex - tsPool map[uint64]struct{} -} - -func TestTSOConsistencyTestSuite(t *testing.T) { - suite.Run(t, new(tsoConsistencyTestSuite)) -} - -func (suite *tsoConsistencyTestSuite) SetupSuite() { - suite.ctx, suite.cancel = context.WithCancel(context.Background()) - suite.dcClientMap = make(map[string]pdpb.PDClient) - suite.tsPool = make(map[uint64]struct{}) -} - -func (suite *tsoConsistencyTestSuite) TearDownSuite() { - suite.cancel() -} - -// TestSynchronizedGlobalTSO is used to test the synchronized way of global TSO generation. -func (suite *tsoConsistencyTestSuite) TestSynchronizedGlobalTSO() { - re := suite.Require() - dcLocationConfig := map[string]string{ - "pd1": "dc-1", - "pd2": "dc-2", - "pd3": "dc-3", - } - dcLocationNum := len(dcLocationConfig) - cluster, err := tests.NewTestCluster(suite.ctx, dcLocationNum, func(conf *config.Config, serverName string) { - conf.EnableLocalTSO = true - conf.Labels[config.ZoneLabel] = dcLocationConfig[serverName] - }) - defer cluster.Destroy() - re.NoError(err) - re.NoError(cluster.RunInitialServers()) - - cluster.WaitAllLeaders(re, dcLocationConfig) - - suite.leaderServer = cluster.GetLeaderServer() - re.NotNil(suite.leaderServer) - suite.dcClientMap[tso.GlobalDCLocation] = testutil.MustNewGrpcClient(re, suite.leaderServer.GetAddr()) - for _, dcLocation := range dcLocationConfig { - pdName := suite.leaderServer.GetAllocatorLeader(dcLocation).GetName() - suite.dcClientMap[dcLocation] = testutil.MustNewGrpcClient(re, cluster.GetServer(pdName).GetAddr()) - } - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - maxGlobalTSO := &pdpb.Timestamp{} - for range tsoRequestRound { - // Get some local TSOs first - oldLocalTSOs := make([]*pdpb.Timestamp, 0, dcLocationNum) - for _, dcLocation := range dcLocationConfig { - localTSO := suite.getTimestampByDC(ctx, re, cluster, dcLocation) - oldLocalTSOs = append(oldLocalTSOs, localTSO) - re.Equal(-1, tsoutil.CompareTimestamp(maxGlobalTSO, localTSO)) - } - // Get a global TSO then - globalTSO := suite.getTimestampByDC(ctx, re, cluster, tso.GlobalDCLocation) - for _, oldLocalTSO := range oldLocalTSOs { - re.Equal(1, tsoutil.CompareTimestamp(globalTSO, oldLocalTSO)) - } - if tsoutil.CompareTimestamp(maxGlobalTSO, globalTSO) < 0 { - maxGlobalTSO = globalTSO - } - // Get some local TSOs again - newLocalTSOs := make([]*pdpb.Timestamp, 0, dcLocationNum) - for _, dcLocation := range dcLocationConfig { - newLocalTSOs = append(newLocalTSOs, suite.getTimestampByDC(ctx, re, cluster, dcLocation)) - } - for _, newLocalTSO := range newLocalTSOs { - re.Equal(-1, tsoutil.CompareTimestamp(maxGlobalTSO, newLocalTSO)) - } - } -} - -func (suite *tsoConsistencyTestSuite) getTimestampByDC( - ctx context.Context, - re *require.Assertions, - cluster *tests.TestCluster, - dcLocation string, -) *pdpb.Timestamp { - req := &pdpb.TsoRequest{ - Header: testutil.NewRequestHeader(suite.leaderServer.GetClusterID()), - Count: tsoCount, - DcLocation: dcLocation, - } - pdClient, ok := suite.dcClientMap[dcLocation] - re.True(ok) - forwardedHost := cluster.GetServer(suite.leaderServer.GetAllocatorLeader(dcLocation).GetName()).GetAddr() - ctx = grpcutil.BuildForwardContext(ctx, forwardedHost) - tsoClient, err := pdClient.Tso(ctx) - re.NoError(err) - defer tsoClient.CloseSend() - re.NoError(tsoClient.Send(req)) - resp, err := tsoClient.Recv() - re.NoError(err) - return checkAndReturnTimestampResponse(re, req, resp) -} - -func (suite *tsoConsistencyTestSuite) TestSynchronizedGlobalTSOOverflow() { - re := suite.Require() - dcLocationConfig := map[string]string{ - "pd1": "dc-1", - "pd2": "dc-2", - "pd3": "dc-3", - } - dcLocationNum := len(dcLocationConfig) - cluster, err := tests.NewTestCluster(suite.ctx, dcLocationNum, func(conf *config.Config, serverName string) { - conf.EnableLocalTSO = true - conf.Labels[config.ZoneLabel] = dcLocationConfig[serverName] - }) - defer cluster.Destroy() - re.NoError(err) - re.NoError(cluster.RunInitialServers()) - - cluster.WaitAllLeaders(re, dcLocationConfig) - - suite.leaderServer = cluster.GetLeaderServer() - re.NotNil(suite.leaderServer) - suite.dcClientMap[tso.GlobalDCLocation] = testutil.MustNewGrpcClient(re, suite.leaderServer.GetAddr()) - for _, dcLocation := range dcLocationConfig { - pdName := suite.leaderServer.GetAllocatorLeader(dcLocation).GetName() - suite.dcClientMap[dcLocation] = testutil.MustNewGrpcClient(re, cluster.GetServer(pdName).GetAddr()) - } - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/globalTSOOverflow", `return(true)`)) - suite.getTimestampByDC(ctx, re, cluster, tso.GlobalDCLocation) - re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/globalTSOOverflow")) -} - -func (suite *tsoConsistencyTestSuite) TestLocalAllocatorLeaderChange() { - re := suite.Require() - re.NoError(failpoint.Enable("github.com/tikv/pd/server/mockLocalAllocatorLeaderChange", `return(true)`)) - dcLocationConfig := map[string]string{ - "pd1": "dc-1", - } - dcLocationNum := len(dcLocationConfig) - cluster, err := tests.NewTestCluster(suite.ctx, dcLocationNum, func(conf *config.Config, serverName string) { - conf.EnableLocalTSO = true - conf.Labels[config.ZoneLabel] = dcLocationConfig[serverName] - }) - defer cluster.Destroy() - re.NoError(err) - re.NoError(cluster.RunInitialServers()) - - cluster.WaitAllLeaders(re, dcLocationConfig) - - suite.leaderServer = cluster.GetLeaderServer() - re.NotNil(suite.leaderServer) - suite.dcClientMap[tso.GlobalDCLocation] = testutil.MustNewGrpcClient(re, suite.leaderServer.GetAddr()) - for _, dcLocation := range dcLocationConfig { - pdName := suite.leaderServer.GetAllocatorLeader(dcLocation).GetName() - suite.dcClientMap[dcLocation] = testutil.MustNewGrpcClient(re, cluster.GetServer(pdName).GetAddr()) - } - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - suite.getTimestampByDC(ctx, re, cluster, tso.GlobalDCLocation) - re.NoError(failpoint.Disable("github.com/tikv/pd/server/mockLocalAllocatorLeaderChange")) -} - -func (suite *tsoConsistencyTestSuite) TestLocalTSO() { - re := suite.Require() - dcLocationConfig := map[string]string{ - "pd1": "dc-1", - "pd2": "dc-2", - "pd3": "dc-3", - } - dcLocationNum := len(dcLocationConfig) - cluster, err := tests.NewTestCluster(suite.ctx, dcLocationNum, func(conf *config.Config, serverName string) { - conf.EnableLocalTSO = true - conf.Labels[config.ZoneLabel] = dcLocationConfig[serverName] - }) - defer cluster.Destroy() - re.NoError(err) - re.NoError(cluster.RunInitialServers()) - - cluster.WaitAllLeaders(re, dcLocationConfig) - suite.testTSO(cluster, dcLocationConfig, nil) -} - -func (suite *tsoConsistencyTestSuite) checkTSOUnique(tso *pdpb.Timestamp) bool { - suite.tsPoolMutex.Lock() - defer suite.tsPoolMutex.Unlock() - ts := tsoutil.GenerateTS(tso) - if _, exist := suite.tsPool[ts]; exist { - return false - } - suite.tsPool[ts] = struct{}{} - return true -} - -func (suite *tsoConsistencyTestSuite) TestLocalTSOAfterMemberChanged() { - re := suite.Require() - dcLocationConfig := map[string]string{ - "pd1": "dc-1", - "pd2": "dc-2", - "pd3": "dc-3", - } - dcLocationNum := len(dcLocationConfig) - cluster, err := tests.NewTestCluster(suite.ctx, dcLocationNum, func(conf *config.Config, serverName string) { - conf.EnableLocalTSO = true - conf.Labels[config.ZoneLabel] = dcLocationConfig[serverName] - }) - defer cluster.Destroy() - re.NoError(err) - re.NoError(cluster.RunInitialServers()) - - cluster.WaitAllLeaders(re, dcLocationConfig) - - leaderServer := cluster.GetLeaderServer() - leaderCli := testutil.MustNewGrpcClient(re, leaderServer.GetAddr()) - req := &pdpb.TsoRequest{ - Header: testutil.NewRequestHeader(cluster.GetCluster().GetId()), - Count: tsoCount, - DcLocation: tso.GlobalDCLocation, - } - ctx, cancel := context.WithCancel(context.Background()) - ctx = grpcutil.BuildForwardContext(ctx, leaderServer.GetAddr()) - previousTS := testGetTimestamp(re, ctx, leaderCli, req) - cancel() - - // Wait for all nodes becoming healthy. - time.Sleep(time.Second * 5) - - // Mock the situation that the system time of PD nodes in dc-4 is slower than others. - re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/systemTimeSlow", `return(true)`)) - - // Join a new dc-location - pd4, err := cluster.Join(suite.ctx, func(conf *config.Config, _ string) { - conf.EnableLocalTSO = true - conf.Labels[config.ZoneLabel] = "dc-4" - }) - re.NoError(err) - re.NoError(pd4.Run()) - dcLocationConfig["pd4"] = "dc-4" - cluster.CheckClusterDCLocation() - re.NotEqual("", cluster.WaitAllocatorLeader( - "dc-4", - tests.WithRetryTimes(90), tests.WithWaitInterval(time.Second), - )) - suite.testTSO(cluster, dcLocationConfig, previousTS) - - re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/systemTimeSlow")) -} - -func (suite *tsoConsistencyTestSuite) testTSO(cluster *tests.TestCluster, dcLocationConfig map[string]string, previousTS *pdpb.Timestamp) { - re := suite.Require() - leaderServer := cluster.GetLeaderServer() - dcClientMap := make(map[string]pdpb.PDClient) - for _, dcLocation := range dcLocationConfig { - pdName := leaderServer.GetAllocatorLeader(dcLocation).GetName() - dcClientMap[dcLocation] = testutil.MustNewGrpcClient(re, cluster.GetServer(pdName).GetAddr()) - } - - var wg sync.WaitGroup - wg.Add(tsoRequestConcurrencyNumber) - for range tsoRequestConcurrencyNumber { - go func() { - defer wg.Done() - lastList := make(map[string]*pdpb.Timestamp) - for _, dcLocation := range dcLocationConfig { - lastList[dcLocation] = &pdpb.Timestamp{ - Physical: 0, - Logical: 0, - } - } - for range tsoRequestRound { - for _, dcLocation := range dcLocationConfig { - req := &pdpb.TsoRequest{ - Header: testutil.NewRequestHeader(leaderServer.GetClusterID()), - Count: tsoCount, - DcLocation: dcLocation, - } - ctx, cancel := context.WithCancel(context.Background()) - ctx = grpcutil.BuildForwardContext(ctx, cluster.GetServer(leaderServer.GetAllocatorLeader(dcLocation).GetName()).GetAddr()) - ts := testGetTimestamp(re, ctx, dcClientMap[dcLocation], req) - cancel() - lastTS := lastList[dcLocation] - // Check whether the TSO fallbacks - re.Equal(1, tsoutil.CompareTimestamp(ts, lastTS)) - if previousTS != nil { - // Because we have a Global TSO synchronization, even though the system time - // of the PD nodes in dc-4 is slower, its TSO will still be big enough. - re.Equal(1, tsoutil.CompareTimestamp(ts, previousTS)) - } - lastList[dcLocation] = ts - // Check whether the TSO is not unique - re.True(suite.checkTSOUnique(ts)) - } - time.Sleep(10 * time.Millisecond) - } - }() - } - wg.Wait() -} diff --git a/tests/server/tso/global_tso_test.go b/tests/server/tso/global_tso_test.go deleted file mode 100644 index bf39c57e3d6..00000000000 --- a/tests/server/tso/global_tso_test.go +++ /dev/null @@ -1,181 +0,0 @@ -// Copyright 2020 TiKV Project Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//go:build tso_full_test || tso_function_test -// +build tso_full_test tso_function_test - -package tso_test - -import ( - "context" - "testing" - "time" - - "github.com/pingcap/failpoint" - "github.com/pingcap/kvproto/pkg/pdpb" - "github.com/stretchr/testify/require" - "github.com/tikv/pd/pkg/tso" - "github.com/tikv/pd/pkg/utils/grpcutil" - "github.com/tikv/pd/pkg/utils/testutil" - "github.com/tikv/pd/pkg/utils/typeutil" - "github.com/tikv/pd/server/config" - "github.com/tikv/pd/tests" -) - -// There are three kinds of ways to generate a TSO: -// 1. Normal Global TSO, the normal way to get a global TSO from the PD leader, -// a.k.a the single Global TSO Allocator. -// 2. Normal Local TSO, the new way to get a local TSO may from any of PD servers, -// a.k.a the Local TSO Allocator leader. -// 3. Synchronized global TSO, the new way to get a global TSO from the PD leader, -// which will coordinate and synchronize a TSO with other Local TSO Allocator -// leaders. - -func TestRequestFollower(t *testing.T) { - re := require.New(t) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - cluster, err := tests.NewTestCluster(ctx, 2) - re.NoError(err) - defer cluster.Destroy() - - re.NoError(cluster.RunInitialServers()) - re.NotEmpty(cluster.WaitLeader()) - - var followerServer *tests.TestServer - for _, s := range cluster.GetServers() { - if s.GetConfig().Name != cluster.GetLeader() { - followerServer = s - } - } - re.NotNil(followerServer) - - grpcPDClient := testutil.MustNewGrpcClient(re, followerServer.GetAddr()) - clusterID := followerServer.GetClusterID() - req := &pdpb.TsoRequest{ - Header: testutil.NewRequestHeader(clusterID), - Count: 1, - DcLocation: tso.GlobalDCLocation, - } - ctx = grpcutil.BuildForwardContext(ctx, followerServer.GetAddr()) - tsoClient, err := grpcPDClient.Tso(ctx) - re.NoError(err) - defer tsoClient.CloseSend() - - start := time.Now() - re.NoError(tsoClient.Send(req)) - _, err = tsoClient.Recv() - re.Error(err) - re.Contains(err.Error(), "generate timestamp failed") - - // Requesting follower should fail fast, or the unavailable time will be - // too long. - re.Less(time.Since(start), time.Second) -} - -// In some cases, when a TSO request arrives, the SyncTimestamp may not finish yet. -// This test is used to simulate this situation and verify that the retry mechanism. -func TestDelaySyncTimestamp(t *testing.T) { - re := require.New(t) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - cluster, err := tests.NewTestCluster(ctx, 2) - re.NoError(err) - defer cluster.Destroy() - re.NoError(cluster.RunInitialServers()) - re.NotEmpty(cluster.WaitLeader()) - - var leaderServer, nextLeaderServer *tests.TestServer - leaderServer = cluster.GetLeaderServer() - re.NotNil(leaderServer) - leaderServer.BootstrapCluster() - for _, s := range cluster.GetServers() { - if s.GetConfig().Name != cluster.GetLeader() { - nextLeaderServer = s - } - } - re.NotNil(nextLeaderServer) - - grpcPDClient := testutil.MustNewGrpcClient(re, nextLeaderServer.GetAddr()) - clusterID := nextLeaderServer.GetClusterID() - req := &pdpb.TsoRequest{ - Header: testutil.NewRequestHeader(clusterID), - Count: 1, - DcLocation: tso.GlobalDCLocation, - } - - re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/delaySyncTimestamp", `return(true)`)) - - // Make the old leader resign and wait for the new leader to get a lease - leaderServer.ResignLeader() - re.True(nextLeaderServer.WaitLeader()) - - ctx = grpcutil.BuildForwardContext(ctx, nextLeaderServer.GetAddr()) - tsoClient, err := grpcPDClient.Tso(ctx) - re.NoError(err) - defer tsoClient.CloseSend() - re.NoError(tsoClient.Send(req)) - resp, err := tsoClient.Recv() - re.NoError(err) - re.NotNil(checkAndReturnTimestampResponse(re, req, resp)) - re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/delaySyncTimestamp")) -} - -func TestLogicalOverflow(t *testing.T) { - re := require.New(t) - - runCase := func(updateInterval time.Duration) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - cluster, err := tests.NewTestCluster(ctx, 1, func(conf *config.Config, _ string) { - conf.TSOUpdatePhysicalInterval = typeutil.Duration{Duration: updateInterval} - }) - defer cluster.Destroy() - re.NoError(err) - re.NoError(cluster.RunInitialServers()) - re.NotEmpty(cluster.WaitLeader()) - - leaderServer := cluster.GetLeaderServer() - re.NotNil(leaderServer) - leaderServer.BootstrapCluster() - grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr()) - clusterID := leaderServer.GetClusterID() - - tsoClient, err := grpcPDClient.Tso(ctx) - re.NoError(err) - defer tsoClient.CloseSend() - - begin := time.Now() - for i := range 3 { - req := &pdpb.TsoRequest{ - Header: testutil.NewRequestHeader(clusterID), - Count: 150000, - DcLocation: tso.GlobalDCLocation, - } - re.NoError(tsoClient.Send(req)) - _, err = tsoClient.Recv() - re.NoError(err) - if i == 1 { - // the 2nd request may (but not must) overflow, as max logical interval is 262144 - re.Less(time.Since(begin), updateInterval+50*time.Millisecond) // additional 50ms for gRPC latency - } - } - // the 3rd request must overflow - re.GreaterOrEqual(time.Since(begin), updateInterval) - } - - for _, updateInterval := range []int{1, 5, 30, 50} { - runCase(time.Duration(updateInterval) * time.Millisecond) - } -} diff --git a/tests/server/tso/manager_test.go b/tests/server/tso/manager_test.go deleted file mode 100644 index 1feb74e6643..00000000000 --- a/tests/server/tso/manager_test.go +++ /dev/null @@ -1,188 +0,0 @@ -// Copyright 2020 TiKV Project Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//go:build tso_full_test || tso_function_test -// +build tso_full_test tso_function_test - -package tso_test - -import ( - "context" - "strconv" - "testing" - "time" - - "github.com/pingcap/failpoint" - "github.com/stretchr/testify/require" - "github.com/tikv/pd/pkg/tso" - "github.com/tikv/pd/pkg/utils/etcdutil" - "github.com/tikv/pd/pkg/utils/testutil" - "github.com/tikv/pd/server/config" - "github.com/tikv/pd/tests" - clientv3 "go.etcd.io/etcd/client/v3" -) - -// TestClusterDCLocations will write different dc-locations to each server -// and test whether we can get the whole dc-location config from each server. -func TestClusterDCLocations(t *testing.T) { - re := require.New(t) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - testCase := struct { - dcLocationNumber int - dcLocationConfig map[string]string - }{ - dcLocationNumber: 3, - dcLocationConfig: map[string]string{ - "pd1": "dc-1", - "pd2": "dc-1", - "pd3": "dc-2", - "pd4": "dc-2", - "pd5": "dc-3", - "pd6": "dc-3", - }, - } - serverNumber := len(testCase.dcLocationConfig) - cluster, err := tests.NewTestCluster(ctx, serverNumber, func(conf *config.Config, serverName string) { - conf.EnableLocalTSO = true - conf.Labels[config.ZoneLabel] = testCase.dcLocationConfig[serverName] - }) - defer cluster.Destroy() - re.NoError(err) - re.NoError(cluster.RunInitialServers()) - - cluster.WaitAllLeaders(re, testCase.dcLocationConfig) - serverNameMap := make(map[uint64]string) - for _, server := range cluster.GetServers() { - serverNameMap[server.GetServerID()] = server.GetServer().Name() - // To speed up the test, we force to do the check - server.GetTSOAllocatorManager().ClusterDCLocationChecker() - } - // Start to check every server's GetClusterDCLocations() result - for _, server := range cluster.GetServers() { - obtainedServerNumber := 0 - dcLocationMap := server.GetTSOAllocatorManager().GetClusterDCLocations() - re.NoError(err) - re.Len(dcLocationMap, testCase.dcLocationNumber) - for obtainedDCLocation, info := range dcLocationMap { - obtainedServerNumber += len(info.ServerIDs) - for _, serverID := range info.ServerIDs { - expectedDCLocation, exist := testCase.dcLocationConfig[serverNameMap[serverID]] - re.True(exist) - re.Equal(expectedDCLocation, obtainedDCLocation) - } - } - re.Equal(serverNumber, obtainedServerNumber) - } -} - -func TestLocalTSOSuffix(t *testing.T) { - re := require.New(t) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - testCase := struct { - dcLocations []string - dcLocationConfig map[string]string - }{ - dcLocations: []string{"dc-1", "dc-2", "dc-3"}, - dcLocationConfig: map[string]string{ - "pd1": "dc-1", - "pd2": "dc-1", - "pd3": "dc-2", - "pd4": "dc-2", - "pd5": "dc-3", - "pd6": "dc-3", - }, - } - serverNumber := len(testCase.dcLocationConfig) - cluster, err := tests.NewTestCluster(ctx, serverNumber, func(conf *config.Config, serverName string) { - conf.EnableLocalTSO = true - conf.Labels[config.ZoneLabel] = testCase.dcLocationConfig[serverName] - }) - defer cluster.Destroy() - re.NoError(err) - re.NoError(cluster.RunInitialServers()) - - cluster.WaitAllLeaders(re, testCase.dcLocationConfig) - - tsoAllocatorManager := cluster.GetServer("pd1").GetTSOAllocatorManager() - for _, dcLocation := range testCase.dcLocations { - suffixResp, err := etcdutil.EtcdKVGet( - cluster.GetEtcdClient(), - tsoAllocatorManager.GetLocalTSOSuffixPath(dcLocation)) - re.NoError(err) - re.Len(suffixResp.Kvs, 1) - // Test the increment of the suffix - allSuffixResp, err := etcdutil.EtcdKVGet( - cluster.GetEtcdClient(), - tsoAllocatorManager.GetLocalTSOSuffixPathPrefix(), - clientv3.WithPrefix(), - clientv3.WithSort(clientv3.SortByValue, clientv3.SortAscend)) - re.NoError(err) - re.Len(allSuffixResp.Kvs, len(testCase.dcLocations)) - var lastSuffixNum int64 - for _, kv := range allSuffixResp.Kvs { - suffixNum, err := strconv.ParseInt(string(kv.Value), 10, 64) - re.NoError(err) - re.Greater(suffixNum, lastSuffixNum) - lastSuffixNum = suffixNum - } - } -} - -func TestNextLeaderKey(t *testing.T) { - re := require.New(t) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - tso.PriorityCheck = 5 * time.Second - defer func() { - tso.PriorityCheck = time.Minute - }() - dcLocationConfig := map[string]string{ - "pd1": "dc-1", - "pd2": "dc-1", - } - serverNum := len(dcLocationConfig) - cluster, err := tests.NewTestCluster(ctx, serverNum, func(conf *config.Config, serverName string) { - conf.EnableLocalTSO = true - conf.Labels[config.ZoneLabel] = dcLocationConfig[serverName] - }) - defer cluster.Destroy() - re.NoError(err) - re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/injectNextLeaderKey", "return(true)")) - re.NoError(cluster.RunInitialServers()) - - cluster.WaitLeader(tests.WithWaitInterval(5*time.Second), tests.WithRetryTimes(3)) - // To speed up the test, we force to do the check - cluster.CheckClusterDCLocation() - originName := cluster.WaitAllocatorLeader("dc-1", tests.WithRetryTimes(5), tests.WithWaitInterval(5*time.Second)) - re.Equal("", originName) - re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/injectNextLeaderKey")) - cluster.CheckClusterDCLocation() - originName = cluster.WaitAllocatorLeader("dc-1") - re.NotEqual("", originName) - for name, server := range cluster.GetServers() { - if name == originName { - continue - } - err := server.GetTSOAllocatorManager().TransferAllocatorForDCLocation("dc-1", server.GetServer().GetMember().ID()) - re.NoError(err) - testutil.Eventually(re, func() bool { - cluster.CheckClusterDCLocation() - currName := cluster.WaitAllocatorLeader("dc-1") - return currName == name - }, testutil.WithTickInterval(time.Second)) - return - } -} diff --git a/tests/server/tso/tso_test.go b/tests/server/tso/tso_test.go index fc2f5999840..cb6b87c83d3 100644 --- a/tests/server/tso/tso_test.go +++ b/tests/server/tso/tso_test.go @@ -1,4 +1,4 @@ -// Copyright 2021 TiKV Project Authors. +// Copyright 2024 TiKV Project Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,14 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build tso_full_test || tso_function_test -// +build tso_full_test tso_function_test - package tso_test import ( "context" "testing" + "time" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/pdpb" @@ -27,116 +25,96 @@ import ( "github.com/tikv/pd/pkg/tso" "github.com/tikv/pd/pkg/utils/grpcutil" "github.com/tikv/pd/pkg/utils/testutil" + "github.com/tikv/pd/pkg/utils/typeutil" "github.com/tikv/pd/server/config" "github.com/tikv/pd/tests" + "go.uber.org/goleak" ) -func TestLoadTimestamp(t *testing.T) { +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m, testutil.LeakOptions...) +} + +func TestRequestFollower(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - dcLocationConfig := map[string]string{ - "pd1": "dc-1", - "pd2": "dc-2", - "pd3": "dc-3", - } - dcLocationNum := len(dcLocationConfig) - cluster, err := tests.NewTestCluster(ctx, dcLocationNum, func(conf *config.Config, serverName string) { - conf.EnableLocalTSO = true - conf.Labels[config.ZoneLabel] = dcLocationConfig[serverName] - }) - defer cluster.Destroy() + cluster, err := tests.NewTestCluster(ctx, 2) re.NoError(err) - re.NoError(cluster.RunInitialServers()) - - cluster.WaitAllLeaders(re, dcLocationConfig) - - lastTSMap := requestLocalTSOs(re, cluster, dcLocationConfig) - - re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/systemTimeSlow", `return(true)`)) + defer cluster.Destroy() - // Reboot the cluster. - re.NoError(cluster.StopAll()) re.NoError(cluster.RunInitialServers()) + re.NotEmpty(cluster.WaitLeader()) - cluster.WaitAllLeaders(re, dcLocationConfig) + var followerServer *tests.TestServer + for _, s := range cluster.GetServers() { + if s.GetConfig().Name != cluster.GetLeader() { + followerServer = s + } + } + re.NotNil(followerServer) - // Re-request the Local TSOs. - newTSMap := requestLocalTSOs(re, cluster, dcLocationConfig) - for dcLocation, newTS := range newTSMap { - lastTS, ok := lastTSMap[dcLocation] - re.True(ok) - // The new physical time of TSO should be larger even if the system time is slow. - re.Greater(newTS.GetPhysical()-lastTS.GetPhysical(), int64(0)) + grpcPDClient := testutil.MustNewGrpcClient(re, followerServer.GetAddr()) + clusterID := followerServer.GetClusterID() + req := &pdpb.TsoRequest{ + Header: testutil.NewRequestHeader(clusterID), + Count: 1, + DcLocation: tso.GlobalDCLocation, } + ctx = grpcutil.BuildForwardContext(ctx, followerServer.GetAddr()) + tsoClient, err := grpcPDClient.Tso(ctx) + re.NoError(err) + defer tsoClient.CloseSend() - re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/systemTimeSlow")) -} + start := time.Now() + re.NoError(tsoClient.Send(req)) + _, err = tsoClient.Recv() + re.Error(err) + re.Contains(err.Error(), "generate timestamp failed") -func requestLocalTSOs(re *require.Assertions, cluster *tests.TestCluster, dcLocationConfig map[string]string) map[string]*pdpb.Timestamp { - dcClientMap := make(map[string]pdpb.PDClient) - tsMap := make(map[string]*pdpb.Timestamp) - leaderServer := cluster.GetLeaderServer() - for _, dcLocation := range dcLocationConfig { - pdName := leaderServer.GetAllocatorLeader(dcLocation).GetName() - dcClientMap[dcLocation] = testutil.MustNewGrpcClient(re, cluster.GetServer(pdName).GetAddr()) - } - for _, dcLocation := range dcLocationConfig { - req := &pdpb.TsoRequest{ - Header: testutil.NewRequestHeader(leaderServer.GetClusterID()), - Count: tsoCount, - DcLocation: dcLocation, - } - ctx, cancel := context.WithCancel(context.Background()) - ctx = grpcutil.BuildForwardContext(ctx, cluster.GetServer(leaderServer.GetAllocatorLeader(dcLocation).GetName()).GetAddr()) - tsMap[dcLocation] = testGetTimestamp(re, ctx, dcClientMap[dcLocation], req) - cancel() - } - return tsMap + // Requesting follower should fail fast, or the unavailable time will be + // too long. + re.Less(time.Since(start), time.Second) } -func TestDisableLocalTSOAfterEnabling(t *testing.T) { +// In some cases, when a TSO request arrives, the SyncTimestamp may not finish yet. +// This test is used to simulate this situation and verify that the retry mechanism. +func TestDelaySyncTimestamp(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - dcLocationConfig := map[string]string{ - "pd1": "dc-1", - "pd2": "dc-2", - "pd3": "dc-3", - } - dcLocationNum := len(dcLocationConfig) - cluster, err := tests.NewTestCluster(ctx, dcLocationNum, func(conf *config.Config, serverName string) { - conf.EnableLocalTSO = true - conf.Labels[config.ZoneLabel] = dcLocationConfig[serverName] - }) - defer cluster.Destroy() + cluster, err := tests.NewTestCluster(ctx, 2) re.NoError(err) + defer cluster.Destroy() re.NoError(cluster.RunInitialServers()) + re.NotEmpty(cluster.WaitLeader()) - cluster.WaitAllLeaders(re, dcLocationConfig) - leaderServer := cluster.GetLeaderServer() + var leaderServer, nextLeaderServer *tests.TestServer + leaderServer = cluster.GetLeaderServer() + re.NotNil(leaderServer) leaderServer.BootstrapCluster() - requestLocalTSOs(re, cluster, dcLocationConfig) - - // Reboot the cluster. - re.NoError(cluster.StopAll()) - for _, server := range cluster.GetServers() { - server.SetEnableLocalTSO(false) + for _, s := range cluster.GetServers() { + if s.GetConfig().Name != cluster.GetLeader() { + nextLeaderServer = s + } } - re.NoError(cluster.RunInitialServers()) - re.NotEmpty(cluster.WaitLeader()) + re.NotNil(nextLeaderServer) - // Re-request the global TSOs. - leaderServer = cluster.GetLeaderServer() - grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr()) - clusterID := leaderServer.GetClusterID() + grpcPDClient := testutil.MustNewGrpcClient(re, nextLeaderServer.GetAddr()) + clusterID := nextLeaderServer.GetClusterID() req := &pdpb.TsoRequest{ Header: testutil.NewRequestHeader(clusterID), Count: 1, DcLocation: tso.GlobalDCLocation, } - ctx = grpcutil.BuildForwardContext(ctx, leaderServer.GetAddr()) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/delaySyncTimestamp", `return(true)`)) + + // Make the old leader resign and wait for the new leader to get a lease + leaderServer.ResignLeader() + re.True(nextLeaderServer.WaitLeader()) + + ctx = grpcutil.BuildForwardContext(ctx, nextLeaderServer.GetAddr()) tsoClient, err := grpcPDClient.Tso(ctx) re.NoError(err) defer tsoClient.CloseSend() @@ -144,8 +122,60 @@ func TestDisableLocalTSOAfterEnabling(t *testing.T) { resp, err := tsoClient.Recv() re.NoError(err) re.NotNil(checkAndReturnTimestampResponse(re, req, resp)) - // Test whether the number of existing DCs is as expected. - dcLocations, err := leaderServer.GetTSOAllocatorManager().GetClusterDCLocationsFromEtcd() - re.NoError(err) - re.Equal(0, len(dcLocations)) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/delaySyncTimestamp")) +} + +func checkAndReturnTimestampResponse(re *require.Assertions, req *pdpb.TsoRequest, resp *pdpb.TsoResponse) *pdpb.Timestamp { + re.Equal(req.GetCount(), resp.GetCount()) + timestamp := resp.GetTimestamp() + re.Positive(timestamp.GetPhysical()) + re.GreaterOrEqual(uint32(timestamp.GetLogical())>>timestamp.GetSuffixBits(), req.GetCount()) + return timestamp +} +func TestLogicalOverflow(t *testing.T) { + re := require.New(t) + + runCase := func(updateInterval time.Duration) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cluster, err := tests.NewTestCluster(ctx, 1, func(conf *config.Config, _ string) { + conf.TSOUpdatePhysicalInterval = typeutil.Duration{Duration: updateInterval} + }) + defer cluster.Destroy() + re.NoError(err) + re.NoError(cluster.RunInitialServers()) + re.NotEmpty(cluster.WaitLeader()) + + leaderServer := cluster.GetLeaderServer() + re.NotNil(leaderServer) + leaderServer.BootstrapCluster() + grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr()) + clusterID := leaderServer.GetClusterID() + + tsoClient, err := grpcPDClient.Tso(ctx) + re.NoError(err) + defer tsoClient.CloseSend() + + begin := time.Now() + for i := range 3 { + req := &pdpb.TsoRequest{ + Header: testutil.NewRequestHeader(clusterID), + Count: 150000, + DcLocation: tso.GlobalDCLocation, + } + re.NoError(tsoClient.Send(req)) + _, err = tsoClient.Recv() + re.NoError(err) + if i == 1 { + // the 2nd request may (but not must) overflow, as max logical interval is 262144 + re.Less(time.Since(begin), updateInterval+50*time.Millisecond) // additional 50ms for gRPC latency + } + } + // the 3rd request must overflow + re.GreaterOrEqual(time.Since(begin), updateInterval) + } + + for _, updateInterval := range []int{1, 5, 30, 50} { + runCase(time.Duration(updateInterval) * time.Millisecond) + } } diff --git a/tools/pd-ut/ut.go b/tools/pd-ut/ut.go index df8bff526f2..7bb0cf17e9f 100644 --- a/tools/pd-ut/ut.go +++ b/tools/pd-ut/ut.go @@ -719,8 +719,8 @@ func generateBuildCache() error { return nil } fmt.Println("generate build cache") - // cd cmd/pd-server && go test -tags=tso_function_test,deadlock -exec-=true -vet=off -toolexec=go-compile-without-link - cmd := exec.Command("go", "test", "-exec=true", "-vet", "off", "--tags=tso_function_test,deadlock") + // cd cmd/pd-server && go test -tags=deadlock -exec-=true -vet=off -toolexec=go-compile-without-link + cmd := exec.Command("go", "test", "-exec=true", "-vet", "off", "--tags=deadlock") goCompileWithoutLink := fmt.Sprintf("-toolexec=%s", filepath.Join(workDir, "tools", "pd-ut", "go-compile-without-link.sh")) cmd.Dir = filepath.Join(workDir, "cmd", "pd-server") if strings.Contains(workDir, integrationsTestPath) { @@ -745,7 +745,7 @@ func buildTestBinaryMulti(pkgs []string) ([]byte, error) { return nil, withTrace(err) } - // go test --exec=xprog --tags=tso_function_test,deadlock -vet=off --count=0 $(pkgs) + // go test --exec=xprog --tags=deadlock -vet=off --count=0 $(pkgs) // workPath just like `/pd/tests/integrations` xprogPath := filepath.Join(workDir, "bin", "xprog") if strings.Contains(workDir, integrationsTestPath) { @@ -758,7 +758,7 @@ func buildTestBinaryMulti(pkgs []string) ([]byte, error) { // We use 2 * parallel for `go build` to make it faster. p := strconv.Itoa(parallel * 2) - cmd := exec.Command("go", "test", "-p", p, "--exec", xprogPath, "-vet", "off", "--tags=tso_function_test,deadlock") + cmd := exec.Command("go", "test", "-p", p, "--exec", xprogPath, "-vet", "off", "--tags=deadlock") if coverProfile != "" { coverPkg := strings.Join([]string{".", "..."}, string(filepath.Separator)) if strings.Contains(workDir, integrationsTestPath) { @@ -793,7 +793,7 @@ func buildTestBinaryMulti(pkgs []string) ([]byte, error) { func buildTestBinary(pkg string) error { //nolint:gosec - cmd := exec.Command("go", "test", "-c", "-vet", "off", "--tags=tso_function_test,deadlock", "-o", testFileName(pkg), "-v") + cmd := exec.Command("go", "test", "-c", "-vet", "off", "--tags=deadlock", "-o", testFileName(pkg), "-v") if coverProfile != "" { coverPkg := strings.Join([]string{".", "..."}, string(filepath.Separator)) cmd.Args = append(cmd.Args, "-cover", fmt.Sprintf("-coverpkg=%s", coverPkg))