Skip to content

Commit

Permalink
tail queue
Browse files Browse the repository at this point in the history
  • Loading branch information
salvacorts committed Feb 16, 2024
1 parent f158b5b commit 9e39142
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 30 deletions.
38 changes: 18 additions & 20 deletions pkg/ingester/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,30 +288,28 @@ func (s *stream) recordAndSendToTailers(record *wal.Record, entries []logproto.E
hasTailers := len(s.tailers) != 0
s.tailerMtx.RUnlock()
if hasTailers {
go func() {
stream := logproto.Stream{Labels: s.labelsString, Entries: entries}

closedTailers := []uint32{}

s.tailerMtx.RLock()
for _, tailer := range s.tailers {
if tailer.isClosed() {
closedTailers = append(closedTailers, tailer.getID())
continue
}
tailer.send(stream, s.labels)
stream := logproto.Stream{Labels: s.labelsString, Entries: entries}

closedTailers := []uint32{}

s.tailerMtx.RLock()
for _, tailer := range s.tailers {
if tailer.isClosed() {
closedTailers = append(closedTailers, tailer.getID())
continue
}
s.tailerMtx.RUnlock()
tailer.send(stream, s.labels)
}
s.tailerMtx.RUnlock()

if len(closedTailers) != 0 {
s.tailerMtx.Lock()
defer s.tailerMtx.Unlock()
if len(closedTailers) != 0 {
s.tailerMtx.Lock()
defer s.tailerMtx.Unlock()

for _, closedTailerID := range closedTailers {
delete(s.tailers, closedTailerID)
}
for _, closedTailerID := range closedTailers {
delete(s.tailers, closedTailerID)
}
}()
}
}
}

Expand Down
64 changes: 54 additions & 10 deletions pkg/ingester/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,29 @@ import (
util_log "github.com/grafana/loki/pkg/util/log"
)

const bufferSizeForTailResponse = 5
const (
bufferSizeForTailResponse = 5
bufferSizeForTailStream = 100
)

type TailServer interface {
Send(*logproto.TailResponse) error
Context() context.Context
}

type tailRequest struct {
stream logproto.Stream
lbs labels.Labels
}

type tailer struct {
id uint32
orgID string
matchers []*labels.Matcher
pipeline syntax.Pipeline
pipelineMtx sync.Mutex

queue chan tailRequest
sendChan chan *logproto.Stream

// Signaling channel used to notify once the tailer gets closed
Expand Down Expand Up @@ -59,6 +68,7 @@ func newTailer(orgID string, expr syntax.LogSelectorExpr, conn TailServer, maxDr
orgID: orgID,
matchers: matchers,
sendChan: make(chan *logproto.Stream, bufferSizeForTailResponse),
queue: make(chan tailRequest, bufferSizeForTailStream),
conn: conn,
droppedStreams: make([]*logproto.DroppedStream, 0, maxDroppedStreams),
maxDroppedStreams: maxDroppedStreams,
Expand All @@ -73,6 +83,9 @@ func (t *tailer) loop() {
var err error
var ok bool

// Launch a go routine to receive streams sent with t.send
go t.receiveStreamsLoop()

for {
select {
case <-t.conn.Context().Done():
Expand Down Expand Up @@ -102,6 +115,37 @@ func (t *tailer) loop() {
}
}

func (t *tailer) receiveStreamsLoop() {
for {
select {
case <-t.conn.Context().Done():
t.close()
return
case <-t.closeChan:
return
case req, ok := <-t.queue:
if !ok {
return
}

streams := t.processStream(req.stream, req.lbs)
if len(streams) == 0 {
continue
}

for _, s := range streams {
select {
case t.sendChan <- s:
default:
t.dropStream(*s)
}
}
}
}
}

// send sends a stream to the tailer for processing and sending to the client.
// It will drop the stream if the tailer is blocked or the queue is full.
func (t *tailer) send(stream logproto.Stream, lbs labels.Labels) {
if t.isClosed() {
return
Expand All @@ -117,16 +161,16 @@ func (t *tailer) send(stream logproto.Stream, lbs labels.Labels) {
return
}

streams := t.processStream(stream, lbs)
if len(streams) == 0 {
return
// Send stream to queue for processing asynchronously
// If the queue is full, drop the stream
req := tailRequest{
stream: stream,
lbs: lbs,
}
for _, s := range streams {
select {
case t.sendChan <- s:
default:
t.dropStream(*s)
}
select {
case t.queue <- req:
default:
t.dropStream(stream)
}
}

Expand Down
50 changes: 50 additions & 0 deletions pkg/ingester/tailer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ingester

import (
"context"
"fmt"
"math/rand"
"sync"
"testing"
Expand All @@ -15,6 +16,55 @@ import (
"github.com/grafana/loki/pkg/logql/syntax"
)

func TestTailer_RoundTrip(t *testing.T) {
server := &fakeTailServer{}

lbs := makeRandomLabels()
expr, err := syntax.ParseLogSelector(lbs.String(), true)
require.NoError(t, err)
tail, err := newTailer("org-id", expr, server, 10)
require.NoError(t, err)
var wg sync.WaitGroup
wg.Add(1)
go func() {
tail.loop()
wg.Done()
}()

const numStreams = 1000
var entries []logproto.Entry
for i := 0; i < numStreams; i += 3 {
var iterEntries []logproto.Entry
for j := 0; j < 3; j++ {
iterEntries = append(iterEntries, logproto.Entry{Timestamp: time.Unix(0, int64(i+j)), Line: fmt.Sprintf("line %d", i+j)})
}
entries = append(entries, iterEntries...)

tail.send(logproto.Stream{
Labels: lbs.String(),
Entries: iterEntries,
}, lbs)

// sleep a bit to allow the tailer to process the stream without dropping
// This should take about 5 seconds to process all the streams
time.Sleep(5 * time.Millisecond)
}

// Wait for the stream to be received by the server.
require.Eventually(t, func() bool {
return len(server.GetResponses()) > 0
}, 30*time.Second, 1*time.Second, "stream was not received")

var processedEntries []logproto.Entry
for _, response := range server.GetResponses() {
processedEntries = append(processedEntries, response.Stream.Entries...)
}
require.ElementsMatch(t, entries, processedEntries)

tail.close()
wg.Wait()
}

func TestTailer_sendRaceConditionOnSendWhileClosing(t *testing.T) {
runs := 100

Expand Down

0 comments on commit 9e39142

Please sign in to comment.