diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index 4c6aa4f9a122e..81ce436929251 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -288,30 +288,28 @@ func (s *stream) recordAndSendToTailers(record *wal.Record, entries []logproto.E hasTailers := len(s.tailers) != 0 s.tailerMtx.RUnlock() if hasTailers { - go func() { - stream := logproto.Stream{Labels: s.labelsString, Entries: entries} - - closedTailers := []uint32{} - - s.tailerMtx.RLock() - for _, tailer := range s.tailers { - if tailer.isClosed() { - closedTailers = append(closedTailers, tailer.getID()) - continue - } - tailer.send(stream, s.labels) + stream := logproto.Stream{Labels: s.labelsString, Entries: entries} + + closedTailers := []uint32{} + + s.tailerMtx.RLock() + for _, tailer := range s.tailers { + if tailer.isClosed() { + closedTailers = append(closedTailers, tailer.getID()) + continue } - s.tailerMtx.RUnlock() + tailer.send(stream, s.labels) + } + s.tailerMtx.RUnlock() - if len(closedTailers) != 0 { - s.tailerMtx.Lock() - defer s.tailerMtx.Unlock() + if len(closedTailers) != 0 { + s.tailerMtx.Lock() + defer s.tailerMtx.Unlock() - for _, closedTailerID := range closedTailers { - delete(s.tailers, closedTailerID) - } + for _, closedTailerID := range closedTailers { + delete(s.tailers, closedTailerID) } - }() + } } } diff --git a/pkg/ingester/tailer.go b/pkg/ingester/tailer.go index 3e9a8a64cfd88..25fdfdb740d7a 100644 --- a/pkg/ingester/tailer.go +++ b/pkg/ingester/tailer.go @@ -17,13 +17,21 @@ import ( util_log "github.com/grafana/loki/pkg/util/log" ) -const bufferSizeForTailResponse = 5 +const ( + bufferSizeForTailResponse = 5 + bufferSizeForTailStream = 100 +) type TailServer interface { Send(*logproto.TailResponse) error Context() context.Context } +type tailRequest struct { + stream logproto.Stream + lbs labels.Labels +} + type tailer struct { id uint32 orgID string @@ -31,6 +39,7 @@ type tailer struct { pipeline syntax.Pipeline pipelineMtx sync.Mutex + queue chan tailRequest sendChan chan *logproto.Stream // Signaling channel used to notify once the tailer gets closed @@ -59,6 +68,7 @@ func newTailer(orgID string, expr syntax.LogSelectorExpr, conn TailServer, maxDr orgID: orgID, matchers: matchers, sendChan: make(chan *logproto.Stream, bufferSizeForTailResponse), + queue: make(chan tailRequest, bufferSizeForTailStream), conn: conn, droppedStreams: make([]*logproto.DroppedStream, 0, maxDroppedStreams), maxDroppedStreams: maxDroppedStreams, @@ -73,6 +83,9 @@ func (t *tailer) loop() { var err error var ok bool + // Launch a go routine to receive streams sent with t.send + go t.receiveStreamsLoop() + for { select { case <-t.conn.Context().Done(): @@ -102,6 +115,37 @@ func (t *tailer) loop() { } } +func (t *tailer) receiveStreamsLoop() { + defer t.close() + for { + select { + case <-t.conn.Context().Done(): + return + case <-t.closeChan: + return + case req, ok := <-t.queue: + if !ok { + return + } + + streams := t.processStream(req.stream, req.lbs) + if len(streams) == 0 { + continue + } + + for _, s := range streams { + select { + case t.sendChan <- s: + default: + t.dropStream(*s) + } + } + } + } +} + +// send sends a stream to the tailer for processing and sending to the client. +// It will drop the stream if the tailer is blocked or the queue is full. func (t *tailer) send(stream logproto.Stream, lbs labels.Labels) { if t.isClosed() { return @@ -117,16 +161,16 @@ func (t *tailer) send(stream logproto.Stream, lbs labels.Labels) { return } - streams := t.processStream(stream, lbs) - if len(streams) == 0 { - return + // Send stream to queue for processing asynchronously + // If the queue is full, drop the stream + req := tailRequest{ + stream: stream, + lbs: lbs, } - for _, s := range streams { - select { - case t.sendChan <- s: - default: - t.dropStream(*s) - } + select { + case t.queue <- req: + default: + t.dropStream(stream) } } diff --git a/pkg/ingester/tailer_test.go b/pkg/ingester/tailer_test.go index 674dde3df8af0..11de0d4daf82c 100644 --- a/pkg/ingester/tailer_test.go +++ b/pkg/ingester/tailer_test.go @@ -2,6 +2,7 @@ package ingester import ( "context" + "fmt" "math/rand" "sync" "testing" @@ -15,6 +16,55 @@ import ( "github.com/grafana/loki/pkg/logql/syntax" ) +func TestTailer_RoundTrip(t *testing.T) { + server := &fakeTailServer{} + + lbs := makeRandomLabels() + expr, err := syntax.ParseLogSelector(lbs.String(), true) + require.NoError(t, err) + tail, err := newTailer("org-id", expr, server, 10) + require.NoError(t, err) + var wg sync.WaitGroup + wg.Add(1) + go func() { + tail.loop() + wg.Done() + }() + + const numStreams = 1000 + var entries []logproto.Entry + for i := 0; i < numStreams; i += 3 { + var iterEntries []logproto.Entry + for j := 0; j < 3; j++ { + iterEntries = append(iterEntries, logproto.Entry{Timestamp: time.Unix(0, int64(i+j)), Line: fmt.Sprintf("line %d", i+j)}) + } + entries = append(entries, iterEntries...) + + tail.send(logproto.Stream{ + Labels: lbs.String(), + Entries: iterEntries, + }, lbs) + + // sleep a bit to allow the tailer to process the stream without dropping + // This should take about 5 seconds to process all the streams + time.Sleep(5 * time.Millisecond) + } + + // Wait for the stream to be received by the server. + require.Eventually(t, func() bool { + return len(server.GetResponses()) > 0 + }, 30*time.Second, 1*time.Second, "stream was not received") + + var processedEntries []logproto.Entry + for _, response := range server.GetResponses() { + processedEntries = append(processedEntries, response.Stream.Entries...) + } + require.ElementsMatch(t, entries, processedEntries) + + tail.close() + wg.Wait() +} + func TestTailer_sendRaceConditionOnSendWhileClosing(t *testing.T) { runs := 100