diff --git a/.circleci/config.yml b/.circleci/config.yml index faed15760b..9bc1c6cc1c 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -15,49 +15,6 @@ executors: - image: quay.io/thanos/docker-swift-onlyone-authv2-keystone:v0.1 jobs: - test: - executor: golang-test - environment: - GO111MODULE: "on" - steps: - - git-shallow-clone/checkout - - go/load-cache - - go/mod-download - - run: - name: Download bingo modules - command: | - make install-tool-deps - - go/save-cache - - setup_remote_docker: - version: docker24 - - run: - name: Create Secret if PR is not forked - # GCS integration tests are run only for author's PR that have write access, because these tests - # require credentials. Env variables that sets up these tests will work only for these kind of PRs. - command: | - if ! [ -z ${GCP_PROJECT} ]; then - echo $GOOGLE_APPLICATION_CREDENTIALS_CONTENT > $GOOGLE_APPLICATION_CREDENTIALS - echo "Awesome! GCS and S3 AWS integration tests are enabled." - fi - - run: - name: "Run unit tests." - no_output_timeout: "30m" - environment: - THANOS_TEST_OBJSTORE_SKIP: GCS,S3,AZURE,COS,ALIYUNOSS,BOS,OCI,OBS - # Variables for Swift testing. - OS_AUTH_URL: http://127.0.0.1:5000/v2.0 - OS_PASSWORD: s3cr3t - OS_PROJECT_NAME: admin - OS_REGION_NAME: RegionOne - OS_USERNAME: admin - # taskset sets CPU affinity to 2 (current CPU limit). - command: | - if [ -z ${GCP_PROJECT} ]; then - export THANOS_TEST_OBJSTORE_SKIP=${THANOS_TEST_OBJSTORE_SKIP} - fi - echo "Skipping tests for object storages: ${THANOS_TEST_OBJSTORE_SKIP}" - taskset 2 make test - # Cross build is needed for publish_release but needs to be done outside of docker. cross_build: machine: true @@ -127,19 +84,11 @@ workflows: version: 2 thanos: jobs: - - test: - filters: - tags: - only: /.*/ - publish_main: - requires: - - test filters: branches: only: main - cross_build: - requires: - - test filters: tags: only: /^v[0-9]+(\.[0-9]+){2}(-.+|[^-.]*)$/ @@ -147,7 +96,6 @@ workflows: ignore: /.*/ - publish_release: requires: - - test - cross_build filters: tags: diff --git a/.github/workflows/go.yaml b/.github/workflows/go.yaml index c496be2e26..1decf7fa2d 100644 --- a/.github/workflows/go.yaml +++ b/.github/workflows/go.yaml @@ -7,11 +7,42 @@ on: tags: pull_request: -# TODO(bwplotka): Add tests here. permissions: contents: read jobs: + unit: + runs-on: ubuntu-latest + name: Thanos unit tests + env: + THANOS_TEST_OBJSTORE_SKIP: GCS,S3,AZURE,COS,ALIYUNOSS,BOS,OCI,OBS,SWIFT + OS_AUTH_URL: http://127.0.0.1:5000/v2.0 + OS_PASSWORD: s3cr3t + OS_PROJECT_NAME: admin + OS_REGION_NAME: RegionOne + OS_USERNAME: admin + GOBIN: /tmp/.bin + services: + swift: + image: 'quay.io/thanos/docker-swift-onlyone-authv2-keystone:v0.1' + ports: + - 5000:5000 + steps: + - name: Checkout code + uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7 + + - name: Install Go. + uses: actions/setup-go@0a12ed9d6a96ab950c8f026ed9f722fe0da7ef32 # v5.0.2 + with: + go-version: 1.23.x + + - name: Install bingo modules + run: make install-tool-deps + - name: Add GOBIN to path + run: echo "/tmp/.bin" >> $GITHUB_PATH + - name: Run unit tests + run: make test + cross-build-check: runs-on: ubuntu-latest name: Go build for different platforms diff --git a/go.mod b/go.mod index 83b2096c9e..a704df0868 100644 --- a/go.mod +++ b/go.mod @@ -112,7 +112,7 @@ require ( ) require ( - capnproto.org/go/capnp/v3 v3.0.1-alpha.1 + capnproto.org/go/capnp/v3 v3.0.1-alpha.2.0.20240830165715-46ccd63a72af github.com/cortexproject/promqlsmith v0.0.0-20240506042652-6cfdd9739a5e github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.1 github.com/hashicorp/golang-lru/v2 v2.0.7 diff --git a/go.sum b/go.sum index 9242479801..208be6b6b8 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -capnproto.org/go/capnp/v3 v3.0.1-alpha.1 h1:hYEclwXEKsnu+PdHASdx3nLP0fC9kZnR+x1CEvMp9ck= -capnproto.org/go/capnp/v3 v3.0.1-alpha.1/go.mod h1:B+ZjwFmHwTYv201x6CdIo7MmDC/TROJDa00kbjTnv1s= +capnproto.org/go/capnp/v3 v3.0.1-alpha.2.0.20240830165715-46ccd63a72af h1:A5wxH0ZidOtYYUGjhtBaRuB87M73bGfc06uWB8sHpg0= +capnproto.org/go/capnp/v3 v3.0.1-alpha.2.0.20240830165715-46ccd63a72af/go.mod h1:2vT5D2dtG8sJGEoEKU17e+j7shdaYp1Myl8X03B3hmc= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU= cloud.google.com/go v0.44.1/go.mod h1:iSa0KzasP4Uvy3f1mN/7PiObzGgflwredwwASm/v6AU= @@ -2098,8 +2098,8 @@ github.com/ovh/go-ovh v1.6.0 h1:ixLOwxQdzYDx296sXcgS35TOPEahJkpjMGtzPadCjQI= github.com/ovh/go-ovh v1.6.0/go.mod h1:cTVDnl94z4tl8pP1uZ/8jlVxntjSIf09bNcQ5TJSC7c= github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0= github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y= -github.com/philhofer/fwd v1.1.1 h1:GdGcTjf5RNAxwS4QLsiMzJYj5KEvPJD3Abr261yRQXQ= -github.com/philhofer/fwd v1.1.1/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= +github.com/philhofer/fwd v1.1.2 h1:bnDivRJ1EWPjUIRXV5KfORO897HTbpFAQddBdE8t7Gw= +github.com/philhofer/fwd v1.1.2/go.mod h1:qkPdfjR2SIEbspLqpe1tO4n5yICnr2DY7mqEx2tUTP0= github.com/phpdave11/gofpdf v1.4.2/go.mod h1:zpO6xFn9yxo3YLyMvW8HcKWVdbNqgIfOOp2dXMnm1mY= github.com/phpdave11/gofpdi v1.0.12/go.mod h1:vBmVV0Do6hSBHC8uKUQ71JGW+ZGQq74llk/7bXwjDoI= github.com/phpdave11/gofpdi v1.0.13/go.mod h1:vBmVV0Do6hSBHC8uKUQ71JGW+ZGQq74llk/7bXwjDoI= @@ -2255,8 +2255,8 @@ github.com/thanos-io/promql-engine v0.0.0-20240921092401-37747eddbd31 h1:xPaP58g github.com/thanos-io/promql-engine v0.0.0-20240921092401-37747eddbd31/go.mod h1:wx0JlRZtsB2S10JYUgeg5GqLfMxw31SzArP+28yyE00= github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab h1:7ZR3hmisBWw77ZpO1/o86g+JV3VKlk3d48jopJxzTjU= github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab/go.mod h1:eheTFp954zcWZXCU8d0AT76ftsQOTo4DTqkN/h3k1MY= -github.com/tinylib/msgp v1.1.5 h1:2gXmtWueD2HefZHQe1QOy9HVzmFrLOVvsXwXBQ0ayy0= -github.com/tinylib/msgp v1.1.5/go.mod h1:eQsjooMTnV42mHu917E26IogZ2930nFyBQdofk10Udg= +github.com/tinylib/msgp v1.1.9 h1:SHf3yoO2sGA0veCJeCBYLHuttAVFHGm2RHgNodW7wQU= +github.com/tinylib/msgp v1.1.9/go.mod h1:BCXGB54lDD8qUEPmiG0cQQUANC4IUQyB2ItS2UDlO/k= github.com/tj/assert v0.0.3 h1:Df/BlaZ20mq6kuai7f5z2TvPFiwC3xaWJSDQNiIS3Rk= github.com/tj/assert v0.0.3/go.mod h1:Ne6X72Q+TB1AteidzQncjw9PabbMp4PBMZ1k+vd1Pvk= github.com/tklauser/go-sysconf v0.3.4/go.mod h1:Cl2c8ZRWfHD5IrfHo9VN+FX9kCFjIOyVklgXycLB6ek= diff --git a/pkg/compact/clean_test.go b/pkg/compact/clean_test.go index cd6135b702..580e151558 100644 --- a/pkg/compact/clean_test.go +++ b/pkg/compact/clean_test.go @@ -25,6 +25,8 @@ import ( ) func TestBestEffortCleanAbortedPartialUploads(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() diff --git a/pkg/compact/compact_test.go b/pkg/compact/compact_test.go index 1d3764907c..4505774651 100644 --- a/pkg/compact/compact_test.go +++ b/pkg/compact/compact_test.go @@ -32,6 +32,8 @@ import ( ) func TestHaltError(t *testing.T) { + t.Parallel() + err := errors.New("test") testutil.Assert(t, !IsHaltError(err), "halt error") @@ -46,6 +48,8 @@ func TestHaltError(t *testing.T) { } func TestHaltMultiError(t *testing.T) { + t.Parallel() + haltErr := halt(errors.New("halt error")) nonHaltErr := errors.New("not a halt error") @@ -59,6 +63,8 @@ func TestHaltMultiError(t *testing.T) { } func TestRetryMultiError(t *testing.T) { + t.Parallel() + retryErr := retry(errors.New("retry error")) nonRetryErr := errors.New("not a retry error") @@ -75,6 +81,8 @@ func TestRetryMultiError(t *testing.T) { } func TestRetryError(t *testing.T) { + t.Parallel() + err := errors.New("test") testutil.Assert(t, !IsRetryError(err), "retry error") @@ -92,6 +100,8 @@ func TestRetryError(t *testing.T) { } func TestGroupKey(t *testing.T) { + t.Parallel() + for _, tcase := range []struct { input metadata.Thanos expected string @@ -131,6 +141,8 @@ func TestGroupKey(t *testing.T) { } func TestGroupMaxMinTime(t *testing.T) { + t.Parallel() + g := &Group{ metasByMinTime: []*metadata.Meta{ {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 10}}, @@ -207,6 +219,8 @@ func createBlockMeta(id uint64, minTime, maxTime int64, labels map[string]string } func TestRetentionProgressCalculate(t *testing.T) { + t.Parallel() + logger := log.NewNopLogger() reg := prometheus.NewRegistry() @@ -328,6 +342,8 @@ func TestRetentionProgressCalculate(t *testing.T) { } func TestCompactProgressCalculate(t *testing.T) { + t.Parallel() + type planResult struct { compactionBlocks, compactionRuns float64 } @@ -433,6 +449,8 @@ func TestCompactProgressCalculate(t *testing.T) { } func TestDownsampleProgressCalculate(t *testing.T) { + t.Parallel() + reg := prometheus.NewRegistry() logger := log.NewNopLogger() @@ -532,6 +550,8 @@ func TestDownsampleProgressCalculate(t *testing.T) { } func TestNoMarkFilterAtomic(t *testing.T) { + t.Parallel() + ctx := context.TODO() logger := log.NewLogfmtLogger(io.Discard) diff --git a/pkg/compact/planner_test.go b/pkg/compact/planner_test.go index 256a8be7bb..9282b51ff0 100644 --- a/pkg/compact/planner_test.go +++ b/pkg/compact/planner_test.go @@ -59,6 +59,8 @@ func (p *tsdbPlannerAdapter) Plan(_ context.Context, metasByMinTime []*metadata. // Adapted from https://github.com/prometheus/prometheus/blob/6c56a1faaaad07317ff585bda75b99bdba0517ad/tsdb/compact_test.go#L167 func TestPlanners_Plan_Compatibility(t *testing.T) { + t.Parallel() + ranges := []int64{ 20, 60, @@ -387,6 +389,8 @@ func TestPlanners_Plan_Compatibility(t *testing.T) { // Adapted form: https://github.com/prometheus/prometheus/blob/6c56a1faaaad07317ff585bda75b99bdba0517ad/tsdb/compact_test.go#L377 func TestRangeWithFailedCompactionWontGetSelected(t *testing.T) { + t.Parallel() + ranges := []int64{ 20, 60, @@ -454,6 +458,8 @@ func TestRangeWithFailedCompactionWontGetSelected(t *testing.T) { } func TestTSDBBasedPlanner_PlanWithNoCompactMarks(t *testing.T) { + t.Parallel() + ranges := []int64{ 20, 60, @@ -646,6 +652,8 @@ func TestTSDBBasedPlanner_PlanWithNoCompactMarks(t *testing.T) { } func TestLargeTotalIndexSizeFilter_Plan(t *testing.T) { + t.Parallel() + ranges := []int64{ 20, 60, diff --git a/pkg/compact/retention_test.go b/pkg/compact/retention_test.go index d80895617c..d883f23fea 100644 --- a/pkg/compact/retention_test.go +++ b/pkg/compact/retention_test.go @@ -28,6 +28,8 @@ import ( ) func TestApplyRetentionPolicyByResolution(t *testing.T) { + t.Parallel() + type testBlock struct { id string minTime time.Time diff --git a/pkg/query/endpointset_test.go b/pkg/query/endpointset_test.go index cc20acf42a..6f061211ab 100644 --- a/pkg/query/endpointset_test.go +++ b/pkg/query/endpointset_test.go @@ -199,6 +199,8 @@ func (e *testEndpoints) CloseOne(addr string) { } func TestTruncateExtLabels(t *testing.T) { + t.Parallel() + const testLength = 10 for _, tc := range []struct { @@ -239,6 +241,8 @@ func TestTruncateExtLabels(t *testing.T) { } func TestEndpointSetUpdate(t *testing.T) { + t.Parallel() + const metricsMeta = ` # HELP thanos_store_nodes_grpc_connections Number of gRPC connection to Store APIs. Opened connection means healthy store APIs available for Querier. # TYPE thanos_store_nodes_grpc_connections gauge @@ -372,6 +376,8 @@ func TestEndpointSetUpdate(t *testing.T) { } func TestEndpointSetUpdate_DuplicateSpecs(t *testing.T) { + t.Parallel() + endpoints, err := startTestEndpoints([]testEndpointMeta{ { InfoResponse: sidecarInfo, @@ -396,6 +402,8 @@ func TestEndpointSetUpdate_DuplicateSpecs(t *testing.T) { } func TestEndpointSetUpdate_EndpointGoingAway(t *testing.T) { + t.Parallel() + endpoints, err := startTestEndpoints([]testEndpointMeta{ { InfoResponse: sidecarInfo, @@ -425,6 +433,8 @@ func TestEndpointSetUpdate_EndpointGoingAway(t *testing.T) { } func TestEndpointSetUpdate_EndpointComingOnline(t *testing.T) { + t.Parallel() + endpoints, err := startTestEndpoints([]testEndpointMeta{ { err: fmt.Errorf("endpoint unavailable"), @@ -454,6 +464,8 @@ func TestEndpointSetUpdate_EndpointComingOnline(t *testing.T) { } func TestEndpointSetUpdate_StrictEndpointMetadata(t *testing.T) { + t.Parallel() + info := sidecarInfo info.Store.MinTime = 111 info.Store.MaxTime = 222 @@ -494,6 +506,8 @@ func TestEndpointSetUpdate_StrictEndpointMetadata(t *testing.T) { } func TestEndpointSetUpdate_PruneInactiveEndpoints(t *testing.T) { + t.Parallel() + testCases := []struct { name string endpoints []testEndpointMeta @@ -561,6 +575,8 @@ func TestEndpointSetUpdate_PruneInactiveEndpoints(t *testing.T) { } func TestEndpointSetUpdate_AtomicEndpointAdditions(t *testing.T) { + t.Parallel() + numResponses := 4 metas := makeInfoResponses(numResponses) metas[1].infoDelay = 2 * time.Second @@ -592,6 +608,8 @@ func TestEndpointSetUpdate_AtomicEndpointAdditions(t *testing.T) { } func TestEndpointSetUpdate_AvailabilityScenarios(t *testing.T) { + t.Parallel() + endpoints, err := startTestEndpoints([]testEndpointMeta{ { InfoResponse: sidecarInfo, @@ -986,6 +1004,8 @@ func TestEndpointSetUpdate_AvailabilityScenarios(t *testing.T) { } func TestEndpointSet_Update_NoneAvailable(t *testing.T) { + t.Parallel() + endpoints, err := startTestEndpoints([]testEndpointMeta{ { InfoResponse: sidecarInfo, @@ -1048,6 +1068,8 @@ func TestEndpointSet_Update_NoneAvailable(t *testing.T) { // TestEndpoint_Update_QuerierStrict tests what happens when the strict mode is enabled/disabled. func TestEndpoint_Update_QuerierStrict(t *testing.T) { + t.Parallel() + endpoints, err := startTestEndpoints([]testEndpointMeta{ { InfoResponse: &infopb.InfoResponse{ @@ -1187,6 +1209,8 @@ func TestEndpoint_Update_QuerierStrict(t *testing.T) { } func TestEndpointSet_APIs_Discovery(t *testing.T) { + t.Parallel() + endpoints, err := startTestEndpoints([]testEndpointMeta{ { InfoResponse: sidecarInfo, @@ -1436,6 +1460,8 @@ func (e *errThatMarshalsToEmptyDict) Error() string { // Test highlights that without wrapping the error, it is marshaled to empty dict {}, not its message. func TestEndpointStringError(t *testing.T) { + t.Parallel() + dictErr := &errThatMarshalsToEmptyDict{msg: "Error message"} stringErr := &stringError{originalErr: dictErr} @@ -1451,6 +1477,8 @@ func TestEndpointStringError(t *testing.T) { // Errors that usually marshal to empty dict should return the original error string. func TestUpdateEndpointStateLastError(t *testing.T) { + t.Parallel() + tcs := []struct { InputError error ExpectedLastErr string @@ -1477,6 +1505,8 @@ func TestUpdateEndpointStateLastError(t *testing.T) { } func TestUpdateEndpointStateForgetsPreviousErrors(t *testing.T) { + t.Parallel() + mockEndpointRef := &endpointRef{ addr: "mockedStore", metadata: &endpointMetadata{ diff --git a/pkg/query/querier_test.go b/pkg/query/querier_test.go index 4e57ea8cc6..794b2e6d63 100644 --- a/pkg/query/querier_test.go +++ b/pkg/query/querier_test.go @@ -44,6 +44,8 @@ type sample struct { } func TestQueryableCreator_MaxResolution(t *testing.T) { + t.Parallel() + testProxy := &testStoreServer{resps: []*storepb.SeriesResponse{}} queryableCreator := NewQueryableCreator(nil, nil, newProxyStore(testProxy), 2, 5*time.Second) @@ -72,6 +74,8 @@ func TestQueryableCreator_MaxResolution(t *testing.T) { // Tests E2E how PromQL works with downsampled data. func TestQuerier_DownsampledData(t *testing.T) { + t.Parallel() + testProxy := &testStoreServer{ resps: []*storepb.SeriesResponse{ storeSeriesResponse(t, labels.FromStrings("__name__", "a", "zzz", "a", "aaa", "bbb"), []sample{{99, 1}, {199, 5}}), // Downsampled chunk from Store. @@ -335,6 +339,8 @@ func (s series) Iterator() chunkenc.Iterator { // // This is because when promql displays data for a given range it looks back 5min before the requested time window. func TestQuerier_Select_AfterPromQL(t *testing.T) { + t.Parallel() + logger := log.NewLogfmtLogger(os.Stderr) for _, tcase := range []struct { @@ -423,6 +429,8 @@ func TestQuerier_Select_AfterPromQL(t *testing.T) { } func TestQuerier_Select(t *testing.T) { + t.Parallel() + logger := log.NewLogfmtLogger(os.Stderr) for _, tcase := range []struct { @@ -1056,6 +1064,8 @@ func (s *mockedSeriesIterator) Next() chunkenc.ValueType { func (s *mockedSeriesIterator) Err() error { return nil } func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) { + t.Parallel() + logger := log.NewLogfmtLogger(os.Stderr) s, err := store.NewLocalStoreFromJSONMmappableFile(logger, component.Debug, nil, "./testdata/issue2401-seriesresponses.json", store.ScanGRPCCurlProtoStreamMessages) diff --git a/pkg/query/query_bench_test.go b/pkg/query/query_bench_test.go index 44e4373a26..bd3368a754 100644 --- a/pkg/query/query_bench_test.go +++ b/pkg/query/query_bench_test.go @@ -28,6 +28,8 @@ import ( // this many times and within different interval e.g // TODO(bwplotka): Add benchmarks with PromQL involvement. func TestQuerySelect(t *testing.T) { + t.Parallel() + tb := testutil.NewTB(t) storetestutil.RunSeriesInterestingCases(tb, 200e3, 200e3, func(t testutil.TB, samplesPerSeries, series int) { benchQuerySelect(t, samplesPerSeries, series, true) diff --git a/pkg/query/remote_engine_test.go b/pkg/query/remote_engine_test.go index 636197b491..2831855bd3 100644 --- a/pkg/query/remote_engine_test.go +++ b/pkg/query/remote_engine_test.go @@ -25,6 +25,8 @@ import ( ) func TestRemoteEngine_Warnings(t *testing.T) { + t.Parallel() + client := NewClient(&warnClient{}, "", nil) engine := NewRemoteEngine(log.NewNopLogger(), client, Opts{ Timeout: 1 * time.Second, @@ -61,6 +63,8 @@ func TestRemoteEngine_Warnings(t *testing.T) { } func TestRemoteEngine_LabelSets(t *testing.T) { + t.Parallel() + tests := []struct { name string tsdbInfos []infopb.TSDBInfo @@ -130,6 +134,8 @@ func TestRemoteEngine_LabelSets(t *testing.T) { } func TestRemoteEngine_MinT(t *testing.T) { + t.Parallel() + tests := []struct { name string tsdbInfos []infopb.TSDBInfo diff --git a/pkg/receive/capnproto_server_test.go b/pkg/receive/capnproto_server_test.go index e054a73ad9..d2e994e313 100644 --- a/pkg/receive/capnproto_server_test.go +++ b/pkg/receive/capnproto_server_test.go @@ -17,6 +17,8 @@ import ( ) func TestCapNProtoServer_SingleConcurrentClient(t *testing.T) { + t.Parallel() + var ( writer = NewCapNProtoWriter( log.NewNopLogger(), @@ -45,6 +47,8 @@ func TestCapNProtoServer_SingleConcurrentClient(t *testing.T) { } func TestCapNProtoServer_MultipleConcurrentClients(t *testing.T) { + t.Parallel() + var ( writer = NewCapNProtoWriter( log.NewNopLogger(), diff --git a/pkg/receive/config_test.go b/pkg/receive/config_test.go index 62435c3283..5ce78e6514 100644 --- a/pkg/receive/config_test.go +++ b/pkg/receive/config_test.go @@ -14,6 +14,8 @@ import ( ) func TestValidateConfig(t *testing.T) { + t.Parallel() + for _, tc := range []struct { name string cfg interface{} @@ -73,6 +75,8 @@ func TestValidateConfig(t *testing.T) { } func TestUnmarshalEndpointSlice(t *testing.T) { + t.Parallel() + cases := []struct { name string json string diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index 44d6306fac..408e0869fb 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -744,6 +744,8 @@ func testReceiveQuorum(t *testing.T, hashringAlgo HashringAlgorithm, withConsist } func TestReceiveQuorumHashmod(t *testing.T) { + t.Parallel() + for _, capnpReplication := range []bool{false, true} { t.Run(fmt.Sprintf("capnproto-replication=%t", capnpReplication), func(t *testing.T) { testReceiveQuorum(t, AlgorithmHashmod, false, capnpReplication) @@ -752,6 +754,8 @@ func TestReceiveQuorumHashmod(t *testing.T) { } func TestReceiveQuorumKetama(t *testing.T) { + t.Parallel() + for _, capnpReplication := range []bool{false, true} { t.Run(fmt.Sprintf("capnproto-replication=%t", capnpReplication), func(t *testing.T) { testReceiveQuorum(t, AlgorithmKetama, false, capnpReplication) @@ -760,6 +764,8 @@ func TestReceiveQuorumKetama(t *testing.T) { } func TestReceiveWithConsistencyDelayHashmod(t *testing.T) { + t.Parallel() + for _, capnpReplication := range []bool{false, true} { t.Run(fmt.Sprintf("capnproto-replication=%t", capnpReplication), func(t *testing.T) { testReceiveQuorum(t, AlgorithmHashmod, true, capnpReplication) @@ -768,6 +774,8 @@ func TestReceiveWithConsistencyDelayHashmod(t *testing.T) { } func TestReceiveWithConsistencyDelayKetama(t *testing.T) { + t.Parallel() + for _, capnpReplication := range []bool{false, true} { t.Run(fmt.Sprintf("capnproto-replication=%t", capnpReplication), func(t *testing.T) { testReceiveQuorum(t, AlgorithmKetama, true, capnpReplication) @@ -776,6 +784,8 @@ func TestReceiveWithConsistencyDelayKetama(t *testing.T) { } func TestReceiveWriteRequestLimits(t *testing.T) { + t.Parallel() + for _, tc := range []struct { name string status int @@ -980,6 +990,8 @@ func BenchmarkHandlerReceiveHTTP(b *testing.B) { } func TestHandlerReceiveHTTP(t *testing.T) { + t.Parallel() + benchmarkHandlerMultiTSDBReceiveRemoteWrite(testutil.NewTB(t)) } @@ -1248,6 +1260,8 @@ func Heap(dir string) (err error) { } func TestIsTenantValid(t *testing.T) { + t.Parallel() + for _, tcase := range []struct { name string tenant string @@ -1295,6 +1309,8 @@ func TestIsTenantValid(t *testing.T) { } func TestRelabel(t *testing.T) { + t.Parallel() + for _, tcase := range []struct { name string relabel []*relabel.Config @@ -1664,6 +1680,8 @@ func TestRelabel(t *testing.T) { } func TestGetStatsLimitParameter(t *testing.T) { + t.Parallel() + t.Run("invalid limit parameter, not integer", func(t *testing.T) { r, err := http.NewRequest(http.MethodGet, "http://0:0", nil) testutil.Ok(t, err) @@ -1710,14 +1728,9 @@ func TestGetStatsLimitParameter(t *testing.T) { }) } -// -//func TestSortedSliceDiff(t *testing.T) { -// testutil.Equals(t, []string{"a"}, getSortedStringSliceDiff([]string{"a", "a", "foo"}, []string{"b", "b", "foo"})) -// testutil.Equals(t, []string{}, getSortedStringSliceDiff([]string{}, []string{"b", "b", "foo"})) -// testutil.Equals(t, []string{}, getSortedStringSliceDiff([]string{}, []string{})) -//} - func TestHashringChangeCallsClose(t *testing.T) { + t.Parallel() + appendables := []*fakeAppendable{ { appender: newFakeAppender(nil, nil, nil), @@ -1746,6 +1759,8 @@ func TestHashringChangeCallsClose(t *testing.T) { } func TestHandlerEarlyStop(t *testing.T) { + t.Parallel() + h := NewHandler(nil, &Options{}) h.Close() @@ -1769,6 +1784,8 @@ func (h *hashringSeenTenants) GetN(tenant string, ts *prompb.TimeSeries, n uint6 } func TestDistributeSeries(t *testing.T) { + t.Parallel() + const tenantIDLabelName = "thanos_tenant_id" h := NewHandler(nil, &Options{ SplitTenantLabelName: tenantIDLabelName, @@ -1804,6 +1821,8 @@ func TestDistributeSeries(t *testing.T) { } func TestHandlerFlippingHashrings(t *testing.T) { + t.Parallel() + h := NewHandler(log.NewLogfmtLogger(os.Stderr), &Options{}) t.Cleanup(h.Close) diff --git a/pkg/receive/hashring_test.go b/pkg/receive/hashring_test.go index 3079490077..0cb2ae4bbe 100644 --- a/pkg/receive/hashring_test.go +++ b/pkg/receive/hashring_test.go @@ -19,6 +19,8 @@ import ( ) func TestHashringGet(t *testing.T) { + t.Parallel() + ts := &prompb.TimeSeries{ Labels: []labelpb.ZLabel{ { @@ -160,6 +162,8 @@ func TestHashringGet(t *testing.T) { } func TestKetamaHashringGet(t *testing.T) { + t.Parallel() + baseTS := &prompb.TimeSeries{ Labels: []labelpb.ZLabel{ { @@ -242,11 +246,15 @@ func TestKetamaHashringGet(t *testing.T) { } func TestKetamaHashringBadConfigIsRejected(t *testing.T) { + t.Parallel() + _, err := newKetamaHashring([]Endpoint{{Address: "node-1"}}, 1, 2) require.Error(t, err) } func TestKetamaHashringConsistency(t *testing.T) { + t.Parallel() + series := makeSeries() ringA := []Endpoint{{Address: "node-1"}, {Address: "node-2"}, {Address: "node-3"}} @@ -267,6 +275,8 @@ func TestKetamaHashringConsistency(t *testing.T) { } func TestKetamaHashringIncreaseAtEnd(t *testing.T) { + t.Parallel() + series := makeSeries() initialRing := []Endpoint{{Address: "node-1"}, {Address: "node-2"}, {Address: "node-3"}} @@ -287,6 +297,8 @@ func TestKetamaHashringIncreaseAtEnd(t *testing.T) { } func TestKetamaHashringIncreaseInMiddle(t *testing.T) { + t.Parallel() + series := makeSeries() initialRing := []Endpoint{{Address: "node-1"}, {Address: "node-3"}} @@ -307,6 +319,8 @@ func TestKetamaHashringIncreaseInMiddle(t *testing.T) { } func TestKetamaHashringReplicationConsistency(t *testing.T) { + t.Parallel() + series := makeSeries() initialRing := []Endpoint{{Address: "node-1"}, {Address: "node-4"}, {Address: "node-5"}} @@ -327,6 +341,8 @@ func TestKetamaHashringReplicationConsistency(t *testing.T) { } func TestKetamaHashringReplicationConsistencyWithAZs(t *testing.T) { + t.Parallel() + for _, tt := range []struct { initialRing []Endpoint resizedRing []Endpoint @@ -379,6 +395,8 @@ func TestKetamaHashringReplicationConsistencyWithAZs(t *testing.T) { } func TestKetamaHashringEvenAZSpread(t *testing.T) { + t.Parallel() + tenant := "default-tenant" ts := &prompb.TimeSeries{ Labels: labelpb.ZLabelsFromPromLabels(labels.FromStrings("foo", "bar")), @@ -482,6 +500,8 @@ func TestKetamaHashringEvenAZSpread(t *testing.T) { } func TestKetamaHashringEvenNodeSpread(t *testing.T) { + t.Parallel() + tenant := "default-tenant" for _, tt := range []struct { @@ -573,6 +593,8 @@ func TestKetamaHashringEvenNodeSpread(t *testing.T) { } func TestInvalidAZHashringCfg(t *testing.T) { + t.Parallel() + for _, tt := range []struct { cfg []HashringConfig replicas uint64 diff --git a/pkg/receive/limiter_config_test.go b/pkg/receive/limiter_config_test.go index 3b6c15876b..a421612e0c 100644 --- a/pkg/receive/limiter_config_test.go +++ b/pkg/receive/limiter_config_test.go @@ -13,6 +13,8 @@ import ( ) func TestParseLimiterConfig(t *testing.T) { + t.Parallel() + tests := []struct { name string configFileName string diff --git a/pkg/receive/limiter_test.go b/pkg/receive/limiter_test.go index 960e567a17..4bb3113aad 100644 --- a/pkg/receive/limiter_test.go +++ b/pkg/receive/limiter_test.go @@ -17,6 +17,8 @@ import ( ) func TestLimiter_StartConfigReloader(t *testing.T) { + t.Parallel() + origLimitsFile, err := os.ReadFile(path.Join("testdata", "limits_config", "good_limits.yaml")) testutil.Ok(t, err) copyLimitsFile := path.Join(t.TempDir(), "limits.yaml") @@ -55,6 +57,8 @@ func (e emptyPathFile) Path() string { } func TestLimiter_CanReload(t *testing.T) { + t.Parallel() + validLimitsPath, err := extkingpin.NewStaticPathContent( path.Join("testdata", "limits_config", "good_limits.yaml"), ) diff --git a/pkg/receive/multitsdb_test.go b/pkg/receive/multitsdb_test.go index eb82c22281..4bee9c0514 100644 --- a/pkg/receive/multitsdb_test.go +++ b/pkg/receive/multitsdb_test.go @@ -39,6 +39,8 @@ import ( ) func TestMultiTSDB(t *testing.T) { + t.Parallel() + dir := t.TempDir() logger := log.NewLogfmtLogger(os.Stderr) @@ -416,6 +418,8 @@ func checkExemplarsResponse(t *testing.T, expected, data []exemplarspb.ExemplarD } func TestMultiTSDBPrune(t *testing.T) { + t.Parallel() + tests := []struct { name string bucket objstore.Bucket @@ -511,6 +515,8 @@ func syncTSDBs(ctx context.Context, m *MultiTSDB, interval time.Duration) error } func TestMultiTSDBRecreatePrunedTenant(t *testing.T) { + t.Parallel() + dir := t.TempDir() m := NewMultiTSDB(dir, log.NewNopLogger(), prometheus.NewRegistry(), @@ -536,6 +542,8 @@ func TestMultiTSDBRecreatePrunedTenant(t *testing.T) { } func TestAlignedHeadFlush(t *testing.T) { + t.Parallel() + hourInSeconds := int64(1 * 60 * 60) tests := []struct { @@ -617,6 +625,8 @@ func TestAlignedHeadFlush(t *testing.T) { } func TestMultiTSDBStats(t *testing.T) { + t.Parallel() + tests := []struct { name string tenants []string @@ -675,6 +685,8 @@ func TestMultiTSDBStats(t *testing.T) { // Regression test for https://github.com/thanos-io/thanos/issues/6047. func TestMultiTSDBWithNilStore(t *testing.T) { + t.Parallel() + dir := t.TempDir() m := NewMultiTSDB(dir, log.NewNopLogger(), prometheus.NewRegistry(), @@ -716,6 +728,8 @@ func (s *slowClient) LabelValues(ctx context.Context, r *storepb.LabelValuesRequ } func TestProxyLabelValues(t *testing.T) { + t.Parallel() + dir := t.TempDir() m := NewMultiTSDB( dir, nil, prometheus.NewRegistry(), &tsdb.Options{ @@ -848,6 +862,8 @@ func BenchmarkMultiTSDB(b *testing.B) { } func TestMultiTSDBDoesNotDeleteNotUploadedBlocks(t *testing.T) { + t.Parallel() + tenant := &tenant{ mtx: &sync.RWMutex{}, } diff --git a/pkg/receive/receive_test.go b/pkg/receive/receive_test.go index ea1b6d81fd..7e90c3c204 100644 --- a/pkg/receive/receive_test.go +++ b/pkg/receive/receive_test.go @@ -26,6 +26,8 @@ func TestMain(m *testing.M) { } func TestAddingExternalLabelsForTenants(t *testing.T) { + t.Parallel() + for _, tc := range []struct { name string cfg []HashringConfig @@ -238,6 +240,8 @@ func TestAddingExternalLabelsForTenants(t *testing.T) { } func TestLabelSetsOfTenantsWhenAddingTenants(t *testing.T) { + t.Parallel() + initialConfig := []HashringConfig{ { Endpoints: []Endpoint{{Address: "node1"}}, @@ -347,6 +351,8 @@ func TestLabelSetsOfTenantsWhenAddingTenants(t *testing.T) { } func TestLabelSetsOfTenantsWhenChangingLabels(t *testing.T) { + t.Parallel() + initialConfig := []HashringConfig{ { Endpoints: []Endpoint{{Address: "node1"}}, @@ -577,6 +583,8 @@ func TestLabelSetsOfTenantsWhenChangingLabels(t *testing.T) { } func TestAddingLabelsWhenTenantAppearsInMultipleHashrings(t *testing.T) { + t.Parallel() + initialConfig := []HashringConfig{ { Endpoints: []Endpoint{{Address: "node1"}}, @@ -745,6 +753,8 @@ func TestAddingLabelsWhenTenantAppearsInMultipleHashrings(t *testing.T) { } func TestReceiverLabelsNotOverwrittenByExternalLabels(t *testing.T) { + t.Parallel() + cfg := []HashringConfig{ { Endpoints: []Endpoint{{Address: "node1"}}, diff --git a/pkg/receive/request_limiter_test.go b/pkg/receive/request_limiter_test.go index 10c131b12f..78a9f82f85 100644 --- a/pkg/receive/request_limiter_test.go +++ b/pkg/receive/request_limiter_test.go @@ -10,6 +10,8 @@ import ( ) func TestRequestLimiter_limitsFor(t *testing.T) { + t.Parallel() + tenantWithLimits := "limited-tenant" tenantWithoutLimits := "unlimited-tenant" @@ -59,6 +61,8 @@ func TestRequestLimiter_limitsFor(t *testing.T) { } func TestRequestLimiter_AllowRequestBodySizeBytes(t *testing.T) { + t.Parallel() + tests := []struct { name string defaultLimits *requestLimitsConfig @@ -117,6 +121,8 @@ func TestRequestLimiter_AllowRequestBodySizeBytes(t *testing.T) { } func TestRequestLimiter_AllowSeries(t *testing.T) { + t.Parallel() + tests := []struct { name string seriesLimit int64 @@ -175,6 +181,8 @@ func TestRequestLimiter_AllowSeries(t *testing.T) { } func TestRequestLimiter_AllowSamples(t *testing.T) { + t.Parallel() + tests := []struct { name string samplesLimit int64 diff --git a/pkg/receive/writer_test.go b/pkg/receive/writer_test.go index 1220e23f72..2db5e6a341 100644 --- a/pkg/receive/writer_test.go +++ b/pkg/receive/writer_test.go @@ -31,6 +31,8 @@ import ( ) func TestWriter(t *testing.T) { + t.Parallel() + now := model.Now() lbls := []labelpb.ZLabel{{Name: "__name__", Value: "test"}} tests := map[string]struct { diff --git a/pkg/reloader/reloader_test.go b/pkg/reloader/reloader_test.go index 35ecb03905..74629d7122 100644 --- a/pkg/reloader/reloader_test.go +++ b/pkg/reloader/reloader_test.go @@ -33,6 +33,8 @@ func TestMain(m *testing.M) { } func TestReloader_ConfigApply(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) defer cancel() @@ -199,6 +201,8 @@ config: } func TestReloader_ConfigRollback(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) defer cancel() @@ -311,6 +315,8 @@ faulty_config: } func TestReloader_ConfigDirApply(t *testing.T) { + t.Parallel() + l, err := net.Listen("tcp", "localhost:0") testutil.Ok(t, err) @@ -612,6 +618,8 @@ func TestReloader_ConfigDirApply(t *testing.T) { } func TestReloader_ConfigDirApplyBasedOnWatchInterval(t *testing.T) { + t.Parallel() + l, err := net.Listen("tcp", "localhost:0") testutil.Ok(t, err) @@ -821,6 +829,8 @@ func TestReloader_ConfigDirApplyBasedOnWatchInterval(t *testing.T) { } func TestReloader_DirectoriesApply(t *testing.T) { + t.Parallel() + l, err := net.Listen("tcp", "localhost:0") testutil.Ok(t, err) @@ -1031,6 +1041,8 @@ func TestReloader_DirectoriesApply(t *testing.T) { } func TestReloader_DirectoriesApplyBasedOnWatchInterval(t *testing.T) { + t.Parallel() + l, err := net.Listen("tcp", "localhost:0") testutil.Ok(t, err) @@ -1166,6 +1178,8 @@ func TestReloader_DirectoriesApplyBasedOnWatchInterval(t *testing.T) { } func TestReloader_ConfigApplyWithWatchIntervalEqualsZero(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) defer cancel() diff --git a/pkg/store/acceptance_test.go b/pkg/store/acceptance_test.go index 9a3cba1311..48fc00adfb 100644 --- a/pkg/store/acceptance_test.go +++ b/pkg/store/acceptance_test.go @@ -41,6 +41,10 @@ import ( "github.com/thanos-io/thanos/pkg/testutil/e2eutil" ) +func TestMain(m *testing.M) { + custom.TolerantVerifyLeakMain(m) +} + type labelNameCallCase struct { matchers []storepb.LabelMatcher start int64 @@ -873,7 +877,6 @@ func testStoreAPIsSeriesSplitSamplesIntoChunksWithMaxSizeOf120(t *testing.T, sta extLset := labels.FromStrings("region", "eu-west") appendFn := func(app storage.Appender) { - var ( ref storage.SeriesRef err error @@ -923,7 +926,8 @@ func testStoreAPIsSeriesSplitSamplesIntoChunksWithMaxSizeOf120(t *testing.T, sta } func TestBucketStore_Acceptance(t *testing.T) { - t.Cleanup(func() { custom.TolerantVerifyLeak(t) }) + t.Parallel() + ctx := context.Background() startStore := func(lazyExpandedPostings bool) func(tt *testing.T, extLset labels.Labels, appendFn func(app storage.Appender)) storepb.StoreServer { @@ -1018,7 +1022,7 @@ func TestBucketStore_Acceptance(t *testing.T) { } func TestPrometheusStore_Acceptance(t *testing.T) { - t.Cleanup(func() { custom.TolerantVerifyLeak(t) }) + t.Parallel() startStore := func(tt *testing.T, extLset labels.Labels, appendFn func(app storage.Appender)) storepb.StoreServer { p, err := e2eutil.NewPrometheus() @@ -1051,7 +1055,7 @@ func TestPrometheusStore_Acceptance(t *testing.T) { } func TestTSDBStore_Acceptance(t *testing.T) { - t.Cleanup(func() { custom.TolerantVerifyLeak(t) }) + t.Parallel() startStore := func(tt *testing.T, extLset labels.Labels, appendFn func(app storage.Appender)) storepb.StoreServer { db, err := e2eutil.NewTSDB() @@ -1069,7 +1073,6 @@ func TestTSDBStore_Acceptance(t *testing.T) { func TestProxyStoreWithTSDBSelector_Acceptance(t *testing.T) { t.Skip("This is a known issue, we need to think how to fix it") - t.Cleanup(func() { custom.TolerantVerifyLeak(t) }) ctx := context.Background() startStore := func(tt *testing.T, extLset labels.Labels, appendFn func(app storage.Appender)) storepb.StoreServer { @@ -1204,7 +1207,7 @@ func TestProxyStoreWithTSDBSelector_Acceptance(t *testing.T) { } func TestProxyStoreWithReplicas_Acceptance(t *testing.T) { - t.Cleanup(func() { custom.TolerantVerifyLeak(t) }) + t.Parallel() startStore := func(tt *testing.T, extLset labels.Labels, appendFn func(app storage.Appender)) storepb.StoreServer { startNestedStore := func(tt *testing.T, extLset labels.Labels, appendFn func(app storage.Appender)) storepb.StoreServer { diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index cec72b374b..bee87898c6 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -555,6 +555,8 @@ func TestBucketStore_ManyParts_e2e(t *testing.T) { } func TestBucketStore_TimePartitioning_e2e(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) defer cancel() bkt := objstore.NewInMemBucket() @@ -608,6 +610,8 @@ func TestBucketStore_TimePartitioning_e2e(t *testing.T) { } func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) { + t.Parallel() + // The query will fetch 2 series from 6 blocks, so we do expect to hit a total of 12 chunks. expectedChunks := uint64(2 * 6) @@ -678,6 +682,8 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) { } func TestBucketStore_Series_CustomBytesLimiters_e2e(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) defer cancel() bkt := objstore.NewInMemBucket() @@ -822,6 +828,8 @@ func TestBucketStore_LabelNames_e2e(t *testing.T) { } func TestBucketStore_LabelNames_SeriesLimiter_e2e(t *testing.T) { + t.Parallel() + cases := map[string]struct { maxSeriesLimit uint64 expectedErr string @@ -980,6 +988,8 @@ func TestBucketStore_LabelValues_e2e(t *testing.T) { } func TestBucketStore_LabelValues_SeriesLimiter_e2e(t *testing.T) { + t.Parallel() + cases := map[string]struct { maxSeriesLimit uint64 expectedErr string diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 8c5b5c61ee..ec3e0bee7e 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -63,19 +63,22 @@ import ( "github.com/thanos-io/thanos/pkg/store/storepb" storetestutil "github.com/thanos-io/thanos/pkg/store/storepb/testutil" "github.com/thanos-io/thanos/pkg/tenancy" - "github.com/thanos-io/thanos/pkg/testutil/custom" "github.com/thanos-io/thanos/pkg/testutil/e2eutil" ) var emptyRelabelConfig = make([]*relabel.Config, 0) func TestRawChunkReset(t *testing.T) { + t.Parallel() + r := rawChunk([]byte{1, 2}) r.Reset([]byte{3, 4}) testutil.Equals(t, []byte(r), []byte{3, 4}) } func TestBucketBlock_Property(t *testing.T) { + t.Parallel() + parameters := gopter.DefaultTestParameters() parameters.Rng.Seed(2000) parameters.MinSuccessfulTests = 20000 @@ -206,7 +209,7 @@ func TestBucketBlock_Property(t *testing.T) { } func TestBucketFilterExtLabelsMatchers(t *testing.T) { - defer custom.TolerantVerifyLeak(t) + t.Parallel() dir := t.TempDir() bkt, err := filesystem.NewBucket(dir) @@ -264,7 +267,7 @@ func TestBucketFilterExtLabelsMatchers(t *testing.T) { } func TestBucketBlock_matchLabels(t *testing.T) { - defer custom.TolerantVerifyLeak(t) + t.Parallel() dir := t.TempDir() @@ -360,7 +363,7 @@ func TestBucketBlock_matchLabels(t *testing.T) { } func TestBucketBlockSet_addGet(t *testing.T) { - defer custom.TolerantVerifyLeak(t) + t.Parallel() set := newBucketBlockSet(labels.Labels{}) @@ -471,7 +474,7 @@ func TestBucketBlockSet_addGet(t *testing.T) { } func TestBucketBlockSet_remove(t *testing.T) { - defer custom.TolerantVerifyLeak(t) + t.Parallel() set := newBucketBlockSet(labels.Labels{}) @@ -501,7 +504,7 @@ func TestBucketBlockSet_remove(t *testing.T) { } func TestBucketBlockSet_labelMatchers(t *testing.T) { - defer custom.TolerantVerifyLeak(t) + t.Parallel() set := newBucketBlockSet(labels.FromStrings("a", "b", "c", "d")) @@ -570,7 +573,7 @@ func TestBucketBlockSet_labelMatchers(t *testing.T) { } func TestGapBasedPartitioner_Partition(t *testing.T) { - defer custom.TolerantVerifyLeak(t) + t.Parallel() const maxGapSize = 1024 * 512 @@ -630,6 +633,8 @@ func TestGapBasedPartitioner_Partition(t *testing.T) { } func TestBucketStoreConfig_validate(t *testing.T) { + t.Parallel() + tests := map[string]struct { config *BucketStore expected error @@ -656,7 +661,7 @@ func TestBucketStoreConfig_validate(t *testing.T) { } func TestBucketStore_TSDBInfo(t *testing.T) { - defer custom.TolerantVerifyLeak(t) + t.Parallel() ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -761,6 +766,8 @@ func (r *recorder) GetRange(ctx context.Context, name string, off, length int64) } func TestBucketStore_Sharding(t *testing.T) { + t.Parallel() + ctx := context.Background() logger := log.NewNopLogger() @@ -1039,6 +1046,8 @@ func expectedTouchedBlockOps(all, expected, cached []ulid.ULID) []string { // Regression tests against: https://github.com/thanos-io/thanos/issues/1983. func TestReadIndexCache_LoadSeries(t *testing.T) { + t.Parallel() + bkt := objstore.NewInMemBucket() ctx := context.Background() @@ -1112,6 +1121,8 @@ func TestReadIndexCache_LoadSeries(t *testing.T) { } func TestBucketIndexReader_ExpandedPostings(t *testing.T) { + t.Parallel() + tb := testutil.NewTB(t) tmpDir := t.TempDir() @@ -1290,6 +1301,8 @@ func benchmarkExpandedPostings( } func TestExpandedPostingsEmptyPostings(t *testing.T) { + t.Parallel() + tmpDir := t.TempDir() bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) @@ -1324,6 +1337,8 @@ func TestExpandedPostingsEmptyPostings(t *testing.T) { } func TestLazyExpandedPostingsEmptyPostings(t *testing.T) { + t.Parallel() + tmpDir := t.TempDir() bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) @@ -1359,6 +1374,8 @@ func TestLazyExpandedPostingsEmptyPostings(t *testing.T) { } func TestBucketSeries(t *testing.T) { + t.Parallel() + tb := testutil.NewTB(t) storetestutil.RunSeriesInterestingCases(tb, 200e3, 200e3, func(t testutil.TB, samplesPerSeries, series int) { benchBucketSeries(t, chunkenc.ValFloat, false, false, samplesPerSeries, series, 1) @@ -1366,6 +1383,8 @@ func TestBucketSeries(t *testing.T) { } func TestBucketSeriesLazyExpandedPostings(t *testing.T) { + t.Parallel() + tb := testutil.NewTB(t) storetestutil.RunSeriesInterestingCases(tb, 200e3, 200e3, func(t testutil.TB, samplesPerSeries, series int) { benchBucketSeries(t, chunkenc.ValFloat, false, true, samplesPerSeries, series, 1) @@ -1373,6 +1392,8 @@ func TestBucketSeriesLazyExpandedPostings(t *testing.T) { } func TestBucketHistogramSeries(t *testing.T) { + t.Parallel() + tb := testutil.NewTB(t) storetestutil.RunSeriesInterestingCases(tb, 200e3, 200e3, func(t testutil.TB, samplesPerSeries, series int) { benchBucketSeries(t, chunkenc.ValHistogram, false, false, samplesPerSeries, series, 1) @@ -1380,6 +1401,8 @@ func TestBucketHistogramSeries(t *testing.T) { } func TestBucketFloatHistogramSeries(t *testing.T) { + t.Parallel() + tb := testutil.NewTB(t) storetestutil.RunSeriesInterestingCases(tb, 200e3, 200e3, func(t testutil.TB, samplesPerSeries, series int) { benchBucketSeries(t, chunkenc.ValFloatHistogram, false, false, samplesPerSeries, series, 1) @@ -1387,6 +1410,8 @@ func TestBucketFloatHistogramSeries(t *testing.T) { } func TestBucketSkipChunksSeries(t *testing.T) { + t.Parallel() + tb := testutil.NewTB(t) storetestutil.RunSeriesInterestingCases(tb, 200e3, 200e3, func(t testutil.TB, samplesPerSeries, series int) { benchBucketSeries(t, chunkenc.ValFloat, true, false, samplesPerSeries, series, 1) @@ -1615,6 +1640,8 @@ func (m *mockedPool) Put(b *[]byte) { // Regression test against: https://github.com/thanos-io/thanos/issues/2147. func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { + t.Parallel() + tmpDir := t.TempDir() bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) @@ -1799,6 +1826,8 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { } func TestSeries_RequestAndResponseHints(t *testing.T) { + t.Parallel() + tb, store, seriesSet1, seriesSet2, block1, block2, close := setupStoreForHintsTest(t) defer close() @@ -1916,6 +1945,8 @@ func TestSeries_RequestAndResponseHints(t *testing.T) { } func TestSeries_ErrorUnmarshallingRequestHints(t *testing.T) { + t.Parallel() + tb := testutil.NewTB(t) tmpDir := t.TempDir() @@ -1977,6 +2008,8 @@ func TestSeries_ErrorUnmarshallingRequestHints(t *testing.T) { } func TestSeries_BlockWithMultipleChunks(t *testing.T) { + t.Parallel() + tb := testutil.NewTB(t) tmpDir := t.TempDir() @@ -2107,6 +2140,8 @@ func TestSeries_BlockWithMultipleChunks(t *testing.T) { } func TestSeries_SeriesSortedWithoutReplicaLabels(t *testing.T) { + t.Parallel() + tests := map[string]struct { series [][]labels.Labels replicaLabels []string @@ -2278,6 +2313,8 @@ func mustMarshalAny(pb proto.Message) *types.Any { } func TestBigEndianPostingsCount(t *testing.T) { + t.Parallel() + const count = 1000 raw := make([]byte, count*4) @@ -2406,6 +2443,8 @@ func setupStoreForHintsTest(t *testing.T) (testutil.TB, *BucketStore, []*storepb } func TestLabelNamesAndValuesHints(t *testing.T) { + t.Parallel() + _, store, seriesSet1, seriesSet2, block1, block2, close := setupStoreForHintsTest(t) defer close() @@ -2545,6 +2584,8 @@ func TestLabelNamesAndValuesHints(t *testing.T) { } func TestSeries_ChunksHaveHashRepresentation(t *testing.T) { + t.Parallel() + tb := testutil.NewTB(t) tmpDir := t.TempDir() @@ -2896,6 +2937,8 @@ func BenchmarkDownsampledBlockSeries(b *testing.B) { } func TestExpandPostingsWithContextCancel(t *testing.T) { + t.Parallel() + // Not enough number of postings to check context cancellation. p := index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5, 6, 7, 8}) ctx, cancel := context.WithCancel(context.Background()) @@ -2936,6 +2979,8 @@ func samePostingGroup(a, b *postingGroup) bool { } func TestMatchersToPostingGroup(t *testing.T) { + t.Parallel() + ctx := context.Background() for _, tc := range []struct { name string @@ -3291,6 +3336,8 @@ func TestMatchersToPostingGroup(t *testing.T) { } func TestPostingGroupMerge(t *testing.T) { + t.Parallel() + for _, tc := range []struct { name string group1 *postingGroup @@ -3417,6 +3464,8 @@ func TestPostingGroupMerge(t *testing.T) { // TestExpandedPostings is a test whether there is a race between multiple ExpandPostings() calls. func TestExpandedPostingsRace(t *testing.T) { + t.Parallel() + const blockCount = 10 tmpDir := t.TempDir() @@ -3485,7 +3534,7 @@ func TestExpandedPostingsRace(t *testing.T) { bucketBlocks = append(bucketBlocks, blk) } - tm, cancel := context.WithTimeout(context.Background(), 40*time.Second) + tm, cancel := context.WithTimeout(context.Background(), 10*time.Second) t.Cleanup(cancel) l := sync.Mutex{} @@ -3510,8 +3559,7 @@ func TestExpandedPostingsRace(t *testing.T) { wg := &sync.WaitGroup{} for i, bb := range bucketBlocks { wg.Add(1) - i := i - bb := bb + go func(i int, bb *bucketBlock) { refs, err := bb.indexReader(logger).ExpandedPostings(context.Background(), m, NewBytesLimiterFactory(0)(nil), false, dummyCounter, tenancy.DefaultTenant) testutil.Ok(t, err) @@ -3531,6 +3579,8 @@ func TestExpandedPostingsRace(t *testing.T) { } func TestBucketIndexReader_decodeCachedPostingsErrors(t *testing.T) { + t.Parallel() + bir := bucketIndexReader{stats: &queryStats{}} t.Run("should return error on broken cached postings without snappy prefix", func(t *testing.T) { _, _, err := bir.decodeCachedPostings([]byte("foo")) @@ -3543,6 +3593,8 @@ func TestBucketIndexReader_decodeCachedPostingsErrors(t *testing.T) { } func TestBucketStoreDedupOnBlockSeriesSet(t *testing.T) { + t.Parallel() + logger := log.NewNopLogger() tmpDir := t.TempDir() bktDir := filepath.Join(tmpDir, "bkt") @@ -3639,6 +3691,8 @@ func TestBucketStoreDedupOnBlockSeriesSet(t *testing.T) { } func TestQueryStatsMerge(t *testing.T) { + t.Parallel() + s := &queryStats{ blocksQueried: 1, postingsTouched: 1, @@ -3759,6 +3813,8 @@ func TestQueryStatsMerge(t *testing.T) { } func TestBucketStoreStreamingSeriesLimit(t *testing.T) { + t.Parallel() + logger := log.NewNopLogger() tmpDir := t.TempDir() bktDir := filepath.Join(tmpDir, "bkt") @@ -3901,6 +3957,8 @@ func (m *compositeBytesLimiterMock) ReserveWithType(num uint64, dataType StoreDa } func TestBucketStoreMetadataLimit(t *testing.T) { + t.Parallel() + tb := testutil.NewTB(t) tmpDir := t.TempDir() diff --git a/pkg/store/io_test.go b/pkg/store/io_test.go index af8e863be6..0303ec5006 100644 --- a/pkg/store/io_test.go +++ b/pkg/store/io_test.go @@ -14,6 +14,8 @@ import ( ) func TestByteRanges_contiguous(t *testing.T) { + t.Parallel() + tests := []struct { ranges byteRanges expected bool @@ -42,6 +44,8 @@ func TestByteRanges_contiguous(t *testing.T) { } func TestReadByteRanges(t *testing.T) { + t.Parallel() + tests := map[string]struct { src []byte ranges byteRanges diff --git a/pkg/store/lazy_postings_test.go b/pkg/store/lazy_postings_test.go index bec95ac413..06157affe0 100644 --- a/pkg/store/lazy_postings_test.go +++ b/pkg/store/lazy_postings_test.go @@ -25,6 +25,8 @@ import ( ) func TestKeysToFetchFromPostingGroups(t *testing.T) { + t.Parallel() + for _, tc := range []struct { name string pgs []*postingGroup @@ -253,6 +255,8 @@ func (h *mockIndexHeaderReader) LabelValues(name string) ([]string, error) { ret func (h *mockIndexHeaderReader) LabelNames() ([]string, error) { return nil, nil } func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { + t.Parallel() + ctx := context.Background() dir := t.TempDir() bkt, err := filesystem.NewBucket(dir) diff --git a/pkg/store/limiter_test.go b/pkg/store/limiter_test.go index 860b4a59b3..b713f0574c 100644 --- a/pkg/store/limiter_test.go +++ b/pkg/store/limiter_test.go @@ -19,6 +19,8 @@ import ( ) func TestLimiter(t *testing.T) { + t.Parallel() + c := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) l := NewLimiter(10, c) @@ -36,6 +38,8 @@ func TestLimiter(t *testing.T) { } func TestRateLimitedServer(t *testing.T) { + t.Parallel() + numSamples := 60 series := []*storepb.SeriesResponse{ storeSeriesResponse(t, labels.FromStrings("series", "1"), makeSamples(numSamples)), diff --git a/pkg/store/postings_codec_test.go b/pkg/store/postings_codec_test.go index fd62359461..5b3dfe88ba 100644 --- a/pkg/store/postings_codec_test.go +++ b/pkg/store/postings_codec_test.go @@ -26,6 +26,8 @@ import ( ) func TestStreamedSnappyMaximumDecodedLen(t *testing.T) { + t.Parallel() + t.Run("compressed", func(t *testing.T) { b := make([]byte, 100) for i := 0; i < 100; i++ { @@ -70,6 +72,8 @@ func TestStreamedSnappyMaximumDecodedLen(t *testing.T) { } func TestDiffVarintCodec(t *testing.T) { + t.Parallel() + chunksDir := t.TempDir() headOpts := tsdb.DefaultHeadOptions() @@ -343,6 +347,8 @@ func FuzzSnappyStreamEncoding(f *testing.F) { } func TestRegressionIssue6545(t *testing.T) { + t.Parallel() + diffVarintPostings, err := os.ReadFile("6545postingsrepro") testutil.Ok(t, err) diff --git a/pkg/store/prometheus_test.go b/pkg/store/prometheus_test.go index e754ef4a15..8fc6f1ef33 100644 --- a/pkg/store/prometheus_test.go +++ b/pkg/store/prometheus_test.go @@ -26,22 +26,23 @@ import ( "github.com/thanos-io/thanos/pkg/promclient" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" - "github.com/thanos-io/thanos/pkg/testutil/custom" "github.com/thanos-io/thanos/pkg/testutil/e2eutil" ) func TestPrometheusStore_Series_e2e(t *testing.T) { + t.Parallel() + testPrometheusStoreSeriesE2e(t, "") } // Regression test for https://github.com/thanos-io/thanos/issues/478. func TestPrometheusStore_Series_promOnPath_e2e(t *testing.T) { + t.Parallel() + testPrometheusStoreSeriesE2e(t, "/prometheus/sub/path") } func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) { - defer custom.TolerantVerifyLeak(t) - p, err := e2eutil.NewPrometheusOnPath(prefix) testutil.Ok(t, err) defer func() { testutil.Ok(t, p.Stop()) }() @@ -170,7 +171,7 @@ func expandChunk(cit chunkenc.Iterator) (res []sample) { } func TestPrometheusStore_SeriesLabels_e2e(t *testing.T) { - defer custom.TolerantVerifyLeak(t) + t.Parallel() p, err := e2eutil.NewPrometheus() testutil.Ok(t, err) @@ -343,7 +344,7 @@ func TestPrometheusStore_SeriesLabels_e2e(t *testing.T) { } func TestPrometheusStore_Series_MatchExternalLabel(t *testing.T) { - defer custom.TolerantVerifyLeak(t) + t.Parallel() p, err := e2eutil.NewPrometheus() testutil.Ok(t, err) @@ -406,7 +407,7 @@ func TestPrometheusStore_Series_MatchExternalLabel(t *testing.T) { } func TestPrometheusStore_Series_ChunkHashCalculation_Integration(t *testing.T) { - defer custom.TolerantVerifyLeak(t) + t.Parallel() p, err := e2eutil.NewPrometheus() testutil.Ok(t, err) diff --git a/pkg/store/proxy_merge_test.go b/pkg/store/proxy_merge_test.go index 96c17a87a8..387d5ff6a8 100644 --- a/pkg/store/proxy_merge_test.go +++ b/pkg/store/proxy_merge_test.go @@ -16,6 +16,8 @@ import ( ) func TestRmLabelsCornerCases(t *testing.T) { + t.Parallel() + testutil.Equals(t, rmLabels(labelsFromStrings("aa", "bb"), map[string]struct{}{ "aa": {}, }), labels.Labels{}) @@ -25,6 +27,8 @@ func TestRmLabelsCornerCases(t *testing.T) { } func TestProxyResponseTreeSort(t *testing.T) { + t.Parallel() + for _, tcase := range []struct { title string input []respSet @@ -238,6 +242,8 @@ type nopClientSendCloser struct { func (c nopClientSendCloser) CloseSend() error { return nil } func TestSortWithoutLabels(t *testing.T) { + t.Parallel() + for _, tcase := range []struct { input []*storepb.SeriesResponse exp []*storepb.SeriesResponse diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index 215dc9515b..1389e89b1d 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -37,7 +37,6 @@ import ( "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" storetestutil "github.com/thanos-io/thanos/pkg/store/storepb/testutil" - "github.com/thanos-io/thanos/pkg/testutil/custom" ) type mockedSeriesServer struct { @@ -60,6 +59,8 @@ type mockedStartTimeDB struct { func (db *mockedStartTimeDB) StartTime() (int64, error) { return db.startTime, nil } func TestProxyStore_TSDBInfos(t *testing.T) { + t.Parallel() + stores := []Client{ &storetestutil.TestClient{ StoreTSDBInfos: nil, @@ -88,7 +89,7 @@ func TestProxyStore_TSDBInfos(t *testing.T) { } func TestProxyStore_Series(t *testing.T) { - defer custom.TolerantVerifyLeak(t) + t.Parallel() for _, tc := range []struct { title string @@ -807,13 +808,13 @@ func TestProxyStore_Series(t *testing.T) { } func TestProxyStore_SeriesSlowStores(t *testing.T) { + t.Parallel() + enable := os.Getenv("THANOS_ENABLE_STORE_READ_TIMEOUT_TESTS") if enable == "" { t.Skip("enable THANOS_ENABLE_STORE_READ_TIMEOUT_TESTS to run store-read-timeout tests") } - defer custom.TolerantVerifyLeak(t) - for _, tc := range []struct { title string storeAPIs []Client @@ -1350,7 +1351,7 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) { } func TestProxyStore_Series_RequestParamsProxied(t *testing.T) { - defer custom.TolerantVerifyLeak(t) + t.Parallel() m := &mockedStoreAPI{ RespSeries: []*storepb.SeriesResponse{ @@ -1394,7 +1395,7 @@ func TestProxyStore_Series_RequestParamsProxied(t *testing.T) { } func TestProxyStore_Series_RegressionFillResponseChannel(t *testing.T) { - defer custom.TolerantVerifyLeak(t) + t.Parallel() var cls []Client for i := 0; i < 10; i++ { @@ -1450,7 +1451,7 @@ func TestProxyStore_Series_RegressionFillResponseChannel(t *testing.T) { } func TestProxyStore_LabelValues(t *testing.T) { - defer custom.TolerantVerifyLeak(t) + t.Parallel() m1 := &mockedStoreAPI{ RespLabelValues: &storepb.LabelValuesResponse{ @@ -1552,7 +1553,7 @@ func TestProxyStore_LabelValues(t *testing.T) { } func TestProxyStore_LabelNames(t *testing.T) { - defer custom.TolerantVerifyLeak(t) + t.Parallel() for _, tc := range []struct { title string @@ -1806,6 +1807,8 @@ func seriesEquals(t *testing.T, expected []rawSeries, got []storepb.Series) { } func TestStoreMatches(t *testing.T) { + t.Parallel() + for _, c := range []struct { s Client mint, maxt int64 @@ -2068,6 +2071,8 @@ func storeSeriesResponse(t testing.TB, lset labels.Labels, smplChunks ...[]sampl } func TestProxySeries(t *testing.T) { + t.Parallel() + tb := testutil.NewTB(t) storetestutil.RunSeriesInterestingCases(tb, 200e3, 200e3, func(t testutil.TB, samplesPerSeries, series int) { benchProxySeries(t, samplesPerSeries, series) @@ -2213,7 +2218,7 @@ func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) { } func TestProxyStore_NotLeakingOnPrematureFinish(t *testing.T) { - defer custom.TolerantVerifyLeak(t) + t.Parallel() logger := log.NewNopLogger() @@ -2338,6 +2343,8 @@ func (m *storeServerStub) Series(_ *storepb.SeriesRequest, server storepb.Store_ } func TestProxyStore_storeMatchMetadata(t *testing.T) { + t.Parallel() + c := storetestutil.TestClient{Name: "testaddr"} c.IsLocalStore = true diff --git a/pkg/store/recover_test.go b/pkg/store/recover_test.go index c71111c556..c9e01ec7dc 100644 --- a/pkg/store/recover_test.go +++ b/pkg/store/recover_test.go @@ -15,6 +15,8 @@ import ( ) func TestRecoverableServer(t *testing.T) { + t.Parallel() + logger := log.NewNopLogger() store := NewRecoverableStoreServer(logger, &panicStoreServer{}) diff --git a/pkg/store/tsdb_selector_test.go b/pkg/store/tsdb_selector_test.go index 745c4edb2e..ea4d1e8c36 100644 --- a/pkg/store/tsdb_selector_test.go +++ b/pkg/store/tsdb_selector_test.go @@ -14,6 +14,8 @@ import ( ) func TestMatchersForLabelSets(t *testing.T) { + t.Parallel() + tests := []struct { name string labelSets []labels.Labels diff --git a/pkg/store/tsdb_test.go b/pkg/store/tsdb_test.go index 319435ef1b..74b203a69f 100644 --- a/pkg/store/tsdb_test.go +++ b/pkg/store/tsdb_test.go @@ -486,11 +486,9 @@ func TestTSDBStore_SeriesAccessWithoutDelegateClosing(t *testing.T) { } func TestTSDBStoreSeries(t *testing.T) { - tb := testutil.NewTB(t) - // Make sure there are more samples, so we can check framing code. - storetestutil.RunSeriesInterestingCases(tb, 10e6, 200e3, func(t testutil.TB, samplesPerSeries, series int) { - benchTSDBStoreSeries(t, samplesPerSeries, series) - }) + t.Parallel() + + benchTSDBStoreSeries(testutil.NewTB(t), 10_000, 1) } func BenchmarkTSDBStoreSeries(b *testing.B) {