-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Tail queue #11930
Tail queue #11930
Conversation
67f851e
to
d8eccd3
Compare
d8eccd3
to
9e39142
Compare
@@ -288,30 +288,28 @@ func (s *stream) recordAndSendToTailers(record *wal.Record, entries []logproto.E | |||
hasTailers := len(s.tailers) != 0 | |||
s.tailerMtx.RUnlock() | |||
if hasTailers { | |||
go func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm I don't understand the surrounding code, but from the recordAndSendToTailers()
name and from the previous usage of a goroutine here, I assume this part should be as non-blocking as possible? If so, does it make sense to keep the goroutine here? We'd still handle the actual sending over a queue, but this part won't be blocking either.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are removing the goroutine and sending the stream to the queue. Since the queue is a buffered channel, it's a non-blocking opt (if chan is full, the stream is dropped)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM in general, though I don't understand enough of the context to give it a proper 👍
pkg/ingester/tailer.go
Outdated
for { | ||
select { | ||
case <-t.conn.Context().Done(): | ||
t.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think maybe t.close() should be in a defer
block so that anywhere we return from this function it would be called?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one change to close the the tailer if we exit the receive loop for any reason, otherwise LGTM!
No description provided.