diff --git a/pkg/querier/http.go b/pkg/querier/http.go index dc29c2f61e04f..a508bf9f7286b 100644 --- a/pkg/querier/http.go +++ b/pkg/querier/http.go @@ -199,7 +199,7 @@ func (q *QuerierAPI) TailHandler(w http.ResponseWriter, r *http.Request) { } level.Error(logger).Log("msg", "Error from client", "err", err) break - } else if tailer.stopped { + } else if tailer.stopped.Load() { return } diff --git a/pkg/querier/tail.go b/pkg/querier/tail.go index fde091b2c8e64..d26fafbe440f8 100644 --- a/pkg/querier/tail.go +++ b/pkg/querier/tail.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sync" + "sync/atomic" "time" "github.com/go-kit/log" @@ -51,8 +52,7 @@ type Tailer struct { querierTailClients map[string]logproto.Querier_TailClient // addr -> grpc clients for tailing logs from ingesters querierTailClientsMtx sync.RWMutex - stopped bool - stoppedMtx sync.RWMutex + stopped atomic.Bool delayFor time.Duration responseChan chan *loghttp.TailResponse closeErrChan chan error @@ -86,9 +86,7 @@ func (t *Tailer) loop() { droppedEntries := make([]loghttp.DroppedEntry, 0) - t.stoppedMtx.RLock() - stopped := t.stopped - t.stoppedMtx.RUnlock() + stopped := t.stopped.Load() for !stopped { select { case <-checkConnectionTicker.C: @@ -218,9 +216,7 @@ func (t *Tailer) readTailClient(addr string, querierTailClient logproto.Querier_ logger := util_log.WithContext(querierTailClient.Context(), t.logger) for { - t.stoppedMtx.RLock() - stopped := t.stopped - t.stoppedMtx.RUnlock() + stopped := t.stopped.Load() if stopped { if err := querierTailClient.CloseSend(); err != nil { level.Error(logger).Log("msg", "Error closing grpc tail client", "err", err) @@ -276,9 +272,7 @@ func (t *Tailer) close() error { t.metrics.tailsActive.Dec() t.metrics.tailedStreamsActive.Sub(t.activeStreamCount()) - t.stoppedMtx.Lock() - t.stopped = true - t.stoppedMtx.Unlock() + t.stopped.Store(true) return t.openStreamIterator.Close() } diff --git a/pkg/querier/tail_test.go b/pkg/querier/tail_test.go index d0b17ea126e2a..07d3743af03c5 100644 --- a/pkg/querier/tail_test.go +++ b/pkg/querier/tail_test.go @@ -389,7 +389,7 @@ func readFromTailer(tailer *Tailer, maxEntries int) ([]*loghttp.TailResponse, er timeoutTicker := time.NewTicker(timeout) defer timeoutTicker.Stop() - for !tailer.stopped && entriesCount < maxEntries { + for !tailer.stopped.Load() && entriesCount < maxEntries { select { case <-timeoutTicker.C: return nil, errors.New("timeout expired while reading responses from Tailer")