Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into jburnham/ruler-disabl…
Browse files Browse the repository at this point in the history
…e-x-scope-orgid
  • Loading branch information
jburnham committed Feb 13, 2024
2 parents 36918e0 + b2e4cc3 commit 80e6039
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 7 deletions.
2 changes: 1 addition & 1 deletion pkg/querier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (q *QuerierAPI) TailHandler(w http.ResponseWriter, r *http.Request) {
}
level.Error(logger).Log("msg", "Error from client", "err", err)
break
} else if tailer.stopped {
} else if tailer.stopped.Load() {
return
}

Expand Down
15 changes: 10 additions & 5 deletions pkg/querier/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"sync"
"time"

"go.uber.org/atomic"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
Expand Down Expand Up @@ -51,7 +53,7 @@ type Tailer struct {
querierTailClients map[string]logproto.Querier_TailClient // addr -> grpc clients for tailing logs from ingesters
querierTailClientsMtx sync.RWMutex

stopped bool
stopped atomic.Bool
delayFor time.Duration
responseChan chan *loghttp.TailResponse
closeErrChan chan error
Expand Down Expand Up @@ -85,7 +87,8 @@ func (t *Tailer) loop() {

droppedEntries := make([]loghttp.DroppedEntry, 0)

for !t.stopped {
stopped := t.stopped.Load()
for !stopped {
select {
case <-checkConnectionTicker.C:
// Try to reconnect dropped ingesters and connect to new ingesters
Expand Down Expand Up @@ -214,7 +217,8 @@ func (t *Tailer) readTailClient(addr string, querierTailClient logproto.Querier_

logger := util_log.WithContext(querierTailClient.Context(), t.logger)
for {
if t.stopped {
stopped := t.stopped.Load()
if stopped {
if err := querierTailClient.CloseSend(); err != nil {
level.Error(logger).Log("msg", "Error closing grpc tail client", "err", err)
}
Expand All @@ -223,7 +227,7 @@ func (t *Tailer) readTailClient(addr string, querierTailClient logproto.Querier_
resp, err = querierTailClient.Recv()
if err != nil {
// We don't want to log error when its due to stopping the tail request
if !t.stopped {
if !stopped {
level.Error(logger).Log("msg", "Error receiving response from grpc tail client", "err", err)
}
break
Expand Down Expand Up @@ -269,7 +273,8 @@ func (t *Tailer) close() error {
t.metrics.tailsActive.Dec()
t.metrics.tailedStreamsActive.Sub(t.activeStreamCount())

t.stopped = true
t.stopped.Store(true)

return t.openStreamIterator.Close()
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/tail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ func readFromTailer(tailer *Tailer, maxEntries int) ([]*loghttp.TailResponse, er
timeoutTicker := time.NewTicker(timeout)
defer timeoutTicker.Stop()

for !tailer.stopped && entriesCount < maxEntries {
for !tailer.stopped.Load() && entriesCount < maxEntries {
select {
case <-timeoutTicker.C:
return nil, errors.New("timeout expired while reading responses from Tailer")
Expand Down

0 comments on commit 80e6039

Please sign in to comment.