-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: ingester-RF1 Push endpoint #13315
Conversation
@@ -4,6 +4,7 @@ server: | |||
http_listen_port: 3100 | |||
grpc_listen_port: 9096 | |||
log_level: debug | |||
grpc_server_max_concurrent_streams: 1000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is required because the server is handling far more requests concurrently now that they wait for a segment flush before returning. It'll need to be even higher in real environments!
@@ -540,7 +578,25 @@ func (i *Ingester) loop() { | |||
for { | |||
select { | |||
case <-flushTicker.C: | |||
i.sweepUsers(false, true) | |||
//i.logger.Log("msg", "starting periodic flush") | |||
i.flushCtx.lock.Lock() // Stop new chunks being written while we swap destinations - we'll never unlock as this flushctx can no longer be used. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this section is the key part of the new approach:
It is responsible for rotating the WAL segment every 500ms.
General idea is to grab a Write lock (preventing an API callers from using the current segment) and to create a new one for clients one as fast as possible. Flush of old segment happens in the background.
Closing of channels is used for signalling the waiting API callers because we can Select on them or on the calling context to handle timeouts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the Timer must be created as part of the context/request so that we can handle multiple flush timeout for different set of requests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can actually forget flush loop and enqueueing for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably also want to flush based on input size to limit block size ? I think we can start with just this for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my comment below on the multiple flushes.
I do need to add the input size to this, yes! That was my next task but I wanted to get early feedback on this XXL PR first - ideally we can parallelise more work on top of this starting point.
// The only time the Write Lock is held is when this context is no longer usable and a new one is being created. | ||
// In this case, we need to re-read i.flushCtx in order to fetch the new one as soon as it's available. | ||
//The newCtxAvailable chan is closed as soon as the new one is available to avoid a busy loop. | ||
currentFlushCtx := i.flushCtx |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the other main piece of the new approach:
API callers will grab the current flush context (which includes a reference to the current WAL segment) by obtaining a Read lock.
The RWMutex is used backwards, which could be improved, but it provides the ability to have many API clients writing to the WAL segment at once (wal.Append is thread safe) and for the ingester to block new appends by grabbing the Write lock.
|
||
sortedLabels := i.index.Add(logproto.FromLabelsToLabelAdapters(labels), fp) | ||
|
||
chunkfmt, headfmt, err := i.chunkFormatAt(minTs(&pushReqStream)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was unsure what most of this chunk related code did - it can almost certainly be removed but the key functionality such as tracking metrics should be retained.
entries: make([]*logproto.Entry, 0, 4096), | ||
} | ||
}, | ||
} | ||
tenantLabel = "__loki_tenant__" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had to copy this from tsdb.TenantLabel
because of a circular import between tsdb & wal via indexWriter.
flushCtx: &flushCtx{ | ||
lock: &sync.RWMutex{}, | ||
flushDone: make(chan struct{}), | ||
newCtxAvailable: make(chan struct{}), | ||
segmentWriter: segmentWriter, | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting choice here.
I thought we would need multiple flushes routine per ingester to be able to flush faster, right now if a single flush is getting slow down (happens) all other requests will be affected.
Not a problem though let's put it to the test in dev.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do have multiple flush routines - the idea was to have one "active" flushCtx at a time, which all Pushs are Appending to.
Every time the timer fires, it will swap the flushCtx for a new one (new wal segment, etc.) as fast as possible - it's just a variable assignment - and then hand off the old flushCtx to be flushed async.
I fully expect flushing to storage to be the slowest part of this, so I'm reusing the existing flush loops. We can have multiple flushes happening in parallel, and only the Pushes that appended to that flush would be waiting. New Pushes would be unaffected as they'd be Appending to a completely different flushCtx.
Definitely open to suggestions if you have better ideas though!
if err != nil { | ||
return err | ||
} | ||
return o.store.PutObject(ctx, "wal-segment-"+time.Now().UTC().Format(time.RFC3339Nano), bytes.NewReader(buffer.Bytes())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can the new ingester deal with this interface instead ? PutObject
This way we don't leak our storage into object storage clients.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think thats a good idea but for now I was just copying the existing interfaces which use "PutChunks" everywhere. There is a slightly lower level interface which only has PutObject, but our storage layer seems to switch between them frequently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I addressed it in b4ca98b
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Let's merge and improve from there. Build is still failing.
What this PR does / why we need it:
Which issue(s) this PR fixes:
Fixes https://github.com/grafana/loki-private/issues/1016