Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tail queue #11930

Merged
merged 2 commits into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 18 additions & 20 deletions pkg/ingester/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,30 +288,28 @@ func (s *stream) recordAndSendToTailers(record *wal.Record, entries []logproto.E
hasTailers := len(s.tailers) != 0
s.tailerMtx.RUnlock()
if hasTailers {
go func() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I don't understand the surrounding code, but from the recordAndSendToTailers() name and from the previous usage of a goroutine here, I assume this part should be as non-blocking as possible? If so, does it make sense to keep the goroutine here? We'd still handle the actual sending over a queue, but this part won't be blocking either.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are removing the goroutine and sending the stream to the queue. Since the queue is a buffered channel, it's a non-blocking opt (if chan is full, the stream is dropped)

stream := logproto.Stream{Labels: s.labelsString, Entries: entries}

closedTailers := []uint32{}

s.tailerMtx.RLock()
for _, tailer := range s.tailers {
if tailer.isClosed() {
closedTailers = append(closedTailers, tailer.getID())
continue
}
tailer.send(stream, s.labels)
stream := logproto.Stream{Labels: s.labelsString, Entries: entries}

closedTailers := []uint32{}

s.tailerMtx.RLock()
for _, tailer := range s.tailers {
if tailer.isClosed() {
closedTailers = append(closedTailers, tailer.getID())
continue
}
s.tailerMtx.RUnlock()
tailer.send(stream, s.labels)
}
s.tailerMtx.RUnlock()

if len(closedTailers) != 0 {
s.tailerMtx.Lock()
defer s.tailerMtx.Unlock()
if len(closedTailers) != 0 {
s.tailerMtx.Lock()
defer s.tailerMtx.Unlock()

for _, closedTailerID := range closedTailers {
delete(s.tailers, closedTailerID)
}
for _, closedTailerID := range closedTailers {
delete(s.tailers, closedTailerID)
}
}()
}
}
}

Expand Down
64 changes: 54 additions & 10 deletions pkg/ingester/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,29 @@ import (
util_log "github.com/grafana/loki/pkg/util/log"
)

const bufferSizeForTailResponse = 5
const (
bufferSizeForTailResponse = 5
bufferSizeForTailStream = 100
)

type TailServer interface {
Send(*logproto.TailResponse) error
Context() context.Context
}

type tailRequest struct {
stream logproto.Stream
lbs labels.Labels
}

type tailer struct {
id uint32
orgID string
matchers []*labels.Matcher
pipeline syntax.Pipeline
pipelineMtx sync.Mutex

queue chan tailRequest
sendChan chan *logproto.Stream

// Signaling channel used to notify once the tailer gets closed
Expand Down Expand Up @@ -59,6 +68,7 @@ func newTailer(orgID string, expr syntax.LogSelectorExpr, conn TailServer, maxDr
orgID: orgID,
matchers: matchers,
sendChan: make(chan *logproto.Stream, bufferSizeForTailResponse),
queue: make(chan tailRequest, bufferSizeForTailStream),
conn: conn,
droppedStreams: make([]*logproto.DroppedStream, 0, maxDroppedStreams),
maxDroppedStreams: maxDroppedStreams,
Expand All @@ -73,6 +83,9 @@ func (t *tailer) loop() {
var err error
var ok bool

// Launch a go routine to receive streams sent with t.send
go t.receiveStreamsLoop()

for {
select {
case <-t.conn.Context().Done():
Expand Down Expand Up @@ -102,6 +115,37 @@ func (t *tailer) loop() {
}
}

func (t *tailer) receiveStreamsLoop() {
defer t.close()
for {
select {
case <-t.conn.Context().Done():
return
case <-t.closeChan:
return
case req, ok := <-t.queue:
if !ok {
return
}

streams := t.processStream(req.stream, req.lbs)
if len(streams) == 0 {
continue
}

for _, s := range streams {
select {
case t.sendChan <- s:
default:
t.dropStream(*s)
}
}
}
}
}

// send sends a stream to the tailer for processing and sending to the client.
// It will drop the stream if the tailer is blocked or the queue is full.
func (t *tailer) send(stream logproto.Stream, lbs labels.Labels) {
if t.isClosed() {
return
Expand All @@ -117,16 +161,16 @@ func (t *tailer) send(stream logproto.Stream, lbs labels.Labels) {
return
}

streams := t.processStream(stream, lbs)
if len(streams) == 0 {
return
// Send stream to queue for processing asynchronously
// If the queue is full, drop the stream
req := tailRequest{
stream: stream,
lbs: lbs,
}
for _, s := range streams {
select {
case t.sendChan <- s:
default:
t.dropStream(*s)
}
select {
case t.queue <- req:
default:
t.dropStream(stream)
}
}

Expand Down
50 changes: 50 additions & 0 deletions pkg/ingester/tailer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ingester

import (
"context"
"fmt"
"math/rand"
"sync"
"testing"
Expand All @@ -15,6 +16,55 @@ import (
"github.com/grafana/loki/pkg/logql/syntax"
)

func TestTailer_RoundTrip(t *testing.T) {
server := &fakeTailServer{}

lbs := makeRandomLabels()
expr, err := syntax.ParseLogSelector(lbs.String(), true)
require.NoError(t, err)
tail, err := newTailer("org-id", expr, server, 10)
require.NoError(t, err)
var wg sync.WaitGroup
wg.Add(1)
go func() {
tail.loop()
wg.Done()
}()

const numStreams = 1000
var entries []logproto.Entry
for i := 0; i < numStreams; i += 3 {
var iterEntries []logproto.Entry
for j := 0; j < 3; j++ {
iterEntries = append(iterEntries, logproto.Entry{Timestamp: time.Unix(0, int64(i+j)), Line: fmt.Sprintf("line %d", i+j)})
}
entries = append(entries, iterEntries...)

tail.send(logproto.Stream{
Labels: lbs.String(),
Entries: iterEntries,
}, lbs)

// sleep a bit to allow the tailer to process the stream without dropping
// This should take about 5 seconds to process all the streams
time.Sleep(5 * time.Millisecond)
}

// Wait for the stream to be received by the server.
require.Eventually(t, func() bool {
return len(server.GetResponses()) > 0
}, 30*time.Second, 1*time.Second, "stream was not received")

var processedEntries []logproto.Entry
for _, response := range server.GetResponses() {
processedEntries = append(processedEntries, response.Stream.Entries...)
}
require.ElementsMatch(t, entries, processedEntries)

tail.close()
wg.Wait()
}

func TestTailer_sendRaceConditionOnSendWhileClosing(t *testing.T) {
runs := 100

Expand Down
Loading