diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 9074580b4eb40..ebaa4b1904dc8 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -11,6 +11,8 @@ import ( "testing" "time" + "go.uber.org/atomic" + "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/backoff" @@ -1498,7 +1500,7 @@ func jsonLine(ts int64, i int) string { type readRingMock struct { replicationSet ring.ReplicationSet - getAllHealthyCallsCount int + getAllHealthyCallsCount atomic.Int32 tokenRangesByIngester map[string]ring.TokenRanges } @@ -1538,7 +1540,7 @@ func (r *readRingMock) BatchGet(_ []uint32, _ ring.Operation) ([]ring.Replicatio } func (r *readRingMock) GetAllHealthy(_ ring.Operation) (ring.ReplicationSet, error) { - r.getAllHealthyCallsCount++ + r.getAllHealthyCallsCount.Add(1) return r.replicationSet, nil } diff --git a/pkg/ingester/recalculate_owned_streams_test.go b/pkg/ingester/recalculate_owned_streams_test.go index 4903375568057..3e531dcdef66f 100644 --- a/pkg/ingester/recalculate_owned_streams_test.go +++ b/pkg/ingester/recalculate_owned_streams_test.go @@ -25,12 +25,12 @@ func Test_recalculateOwnedStreams_newRecalculateOwnedStreamsIngester(t *testing. }, 0) strategy := newOwnedStreamsIngesterStrategy("test", mockRing, log.NewNopLogger()) service := newRecalculateOwnedStreamsSvc(mockInstancesSupplier.get, strategy, 50*time.Millisecond, log.NewNopLogger()) - require.Equal(t, 0, mockRing.getAllHealthyCallsCount, "ring must be called only after service's start up") + require.Equal(t, int32(0), mockRing.getAllHealthyCallsCount.Load(), "ring must be called only after service's start up") ctx := context.Background() require.NoError(t, service.StartAsync(ctx)) require.NoError(t, service.AwaitRunning(ctx)) require.Eventually(t, func() bool { - return mockRing.getAllHealthyCallsCount >= 2 + return mockRing.getAllHealthyCallsCount.Load() >= 2 }, 1*time.Second, 50*time.Millisecond, "expected at least two runs of the iteration") }