Skip to content

Commit

Permalink
Use atomic.Bool instead of mutex for clarity and performance
Browse files Browse the repository at this point in the history
  • Loading branch information
paul1r committed Feb 13, 2024
1 parent 9f1074c commit 97b5235
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 13 deletions.
2 changes: 1 addition & 1 deletion pkg/querier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (q *QuerierAPI) TailHandler(w http.ResponseWriter, r *http.Request) {
}
level.Error(logger).Log("msg", "Error from client", "err", err)
break
} else if tailer.stopped {
} else if tailer.stopped.Load() {
return
}

Expand Down
16 changes: 5 additions & 11 deletions pkg/querier/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/go-kit/log"
Expand Down Expand Up @@ -51,8 +52,7 @@ type Tailer struct {
querierTailClients map[string]logproto.Querier_TailClient // addr -> grpc clients for tailing logs from ingesters
querierTailClientsMtx sync.RWMutex

stopped bool
stoppedMtx sync.RWMutex
stopped atomic.Bool
delayFor time.Duration
responseChan chan *loghttp.TailResponse
closeErrChan chan error
Expand Down Expand Up @@ -86,9 +86,7 @@ func (t *Tailer) loop() {

droppedEntries := make([]loghttp.DroppedEntry, 0)

t.stoppedMtx.RLock()
stopped := t.stopped
t.stoppedMtx.RUnlock()
stopped := t.stopped.Load()
for !stopped {
select {
case <-checkConnectionTicker.C:
Expand Down Expand Up @@ -218,9 +216,7 @@ func (t *Tailer) readTailClient(addr string, querierTailClient logproto.Querier_

logger := util_log.WithContext(querierTailClient.Context(), t.logger)
for {
t.stoppedMtx.RLock()
stopped := t.stopped
t.stoppedMtx.RUnlock()
stopped := t.stopped.Load()
if stopped {
if err := querierTailClient.CloseSend(); err != nil {
level.Error(logger).Log("msg", "Error closing grpc tail client", "err", err)
Expand Down Expand Up @@ -276,9 +272,7 @@ func (t *Tailer) close() error {
t.metrics.tailsActive.Dec()
t.metrics.tailedStreamsActive.Sub(t.activeStreamCount())

t.stoppedMtx.Lock()
t.stopped = true
t.stoppedMtx.Unlock()
t.stopped.Store(true)

return t.openStreamIterator.Close()
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/tail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ func readFromTailer(tailer *Tailer, maxEntries int) ([]*loghttp.TailResponse, er
timeoutTicker := time.NewTicker(timeout)
defer timeoutTicker.Stop()

for !tailer.stopped && entriesCount < maxEntries {
for !tailer.stopped.Load() && entriesCount < maxEntries {
select {
case <-timeoutTicker.C:
return nil, errors.New("timeout expired while reading responses from Tailer")
Expand Down

0 comments on commit 97b5235

Please sign in to comment.