diff --git a/pkg/querier/http.go b/pkg/querier/http.go index dc29c2f61e04f..a508bf9f7286b 100644 --- a/pkg/querier/http.go +++ b/pkg/querier/http.go @@ -199,7 +199,7 @@ func (q *QuerierAPI) TailHandler(w http.ResponseWriter, r *http.Request) { } level.Error(logger).Log("msg", "Error from client", "err", err) break - } else if tailer.stopped { + } else if tailer.stopped.Load() { return } diff --git a/pkg/querier/tail.go b/pkg/querier/tail.go index 1b3cfd5fcb8c0..35cb4bc18e7a7 100644 --- a/pkg/querier/tail.go +++ b/pkg/querier/tail.go @@ -6,6 +6,8 @@ import ( "sync" "time" + "go.uber.org/atomic" + "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/pkg/errors" @@ -51,7 +53,7 @@ type Tailer struct { querierTailClients map[string]logproto.Querier_TailClient // addr -> grpc clients for tailing logs from ingesters querierTailClientsMtx sync.RWMutex - stopped bool + stopped atomic.Bool delayFor time.Duration responseChan chan *loghttp.TailResponse closeErrChan chan error @@ -85,7 +87,8 @@ func (t *Tailer) loop() { droppedEntries := make([]loghttp.DroppedEntry, 0) - for !t.stopped { + stopped := t.stopped.Load() + for !stopped { select { case <-checkConnectionTicker.C: // Try to reconnect dropped ingesters and connect to new ingesters @@ -214,7 +217,8 @@ func (t *Tailer) readTailClient(addr string, querierTailClient logproto.Querier_ logger := util_log.WithContext(querierTailClient.Context(), t.logger) for { - if t.stopped { + stopped := t.stopped.Load() + if stopped { if err := querierTailClient.CloseSend(); err != nil { level.Error(logger).Log("msg", "Error closing grpc tail client", "err", err) } @@ -223,7 +227,7 @@ func (t *Tailer) readTailClient(addr string, querierTailClient logproto.Querier_ resp, err = querierTailClient.Recv() if err != nil { // We don't want to log error when its due to stopping the tail request - if !t.stopped { + if !stopped { level.Error(logger).Log("msg", "Error receiving response from grpc tail client", "err", err) } break @@ -269,7 +273,8 @@ func (t *Tailer) close() error { t.metrics.tailsActive.Dec() t.metrics.tailedStreamsActive.Sub(t.activeStreamCount()) - t.stopped = true + t.stopped.Store(true) + return t.openStreamIterator.Close() } diff --git a/pkg/querier/tail_test.go b/pkg/querier/tail_test.go index d0b17ea126e2a..07d3743af03c5 100644 --- a/pkg/querier/tail_test.go +++ b/pkg/querier/tail_test.go @@ -389,7 +389,7 @@ func readFromTailer(tailer *Tailer, maxEntries int) ([]*loghttp.TailResponse, er timeoutTicker := time.NewTicker(timeout) defer timeoutTicker.Stop() - for !tailer.stopped && entriesCount < maxEntries { + for !tailer.stopped.Load() && entriesCount < maxEntries { select { case <-timeoutTicker.C: return nil, errors.New("timeout expired while reading responses from Tailer")