-
Notifications
You must be signed in to change notification settings - Fork 84
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Coalesce timers to improve relay performance #545
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some feedback to simplify that should also make things faster.
} | ||
|
||
// Wheel FIXME | ||
type Wheel struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could simplify this further for our use case:
- we know what the maximum timeout is (2 mins), and we can reject timers set past this timeout.
- if we're thinking of starting with 5ms buckets, that means we'll only have 400 buckets
- since this is a pretty small number, having a lock per bucket seems reasonable
- whenever we need to add a timer, we have to bring the bucket into the cache line anyway, so having the mutex + bucket on the same cache line makes sense
So I think we can get rid of the bucket mask and lock mask, and possibly also the padded mutex, since we could do something like:
type bucket struct {
sync.Mutex
timers []*Timer
timersBacking [1000]*Timer
}
The timersBacking
would ensure no sharing by making bucket
(way) too large to fit into a cache line, and means the initial timers are right next to the mutex.
In terms of memory usage, each bucket is 4 pointers + size of timersBacking
* pointers per bucket. If we go with 1000 timers, that's ~8k per bucket. 8k * 400 buckets = ~3.2MB, which seems pretty reasonable since we'll only have a single wheel per channel.
Since we have plenty of RAM, we could probably bump up timersBacking
if needed (e.g., if we get ~50k requests per second in a channel, we'd probably want 50k/5ms = 10000 which is still reasonable.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will try a struct with both a slice and its backing array next; switching to pooled slices from a linked list has actually made things ~30ns slower.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A struct of slice + backing array is also slower than a linked list. Part of the reason is that I think your arithmetic is off: 2m == 120s == 120,000 ms == 24,000 buckets. At 8k per bucket, thats 190MB of timers, which doesn't seem as reasonable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad on number of buckets, it's definitely a lot more memory to do that. We'll need to use a linked-list like approach, although each list element could store multiple timers.
We need another benchmark that actually involves firing timers, since the optimizations I'm suggesting are optimizing the fired timers path, and the current benchmark only looks at creating+stopping timers.
|
||
// Stop FIXME | ||
func (t *Timer) Stop() { | ||
t.state.Store(stateCanceled) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you use CAS(stateDefault, stateCanceled)
, you can return whether the timer was cancelled or not.
That will let us have correct exactly-once call.End()
semantics
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, will do.
close(w.stopped) | ||
} | ||
|
||
func (w *Wheel) gatherTimers(startTick, endTick uint64) *Timer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we use a bucket type, and know everything in the bucket has expired, this gets a little simpler too:
- the wheel just has
[]*bucket
- when a tick fires, we swap out the
*bucket
pointer (probably pool it) to a new bucket in the wheel, and then start processing the old bucket - processing the old bucket is just iterate over all timers, if the
CAS(stateDefault, stateExpired)
succeeds, then executing the function. - once we've executed everything, we reset the bucket
.timers = .timersBacking[:0]
and put it in the bucket pool
6f5b29e
to
13bc016
Compare
Yep, I'll add one. I'll need to pivot some of the APIs to accommodate
benchmarking that part well.
|
13bc016
to
708d436
Compare
Updated with a singly-linked list implementation, which is ~twice as fast as the standard lib when scheduling and canceling a timer, and modestly faster when firing timeouts (it's much harder to accurately benchmark the latter case):
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haven't really looked at unit tests yet -- I think we need more coverage to catch the bucket mask bug.
timers *Timer | ||
// Avoid false sharing: | ||
// http://mechanical-sympathy.blogspot.com/2011/07/false-sharing.html | ||
pad [cacheline - unsafe.Sizeof(sync.Mutex{}) - unsafe.Sizeof(&Timer{})]byte |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can use _
to ensure we never try to use this field.
|
||
const ( | ||
// State machine for timers. | ||
stateDefault = iota |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe this should be stateScheduled
?
} | ||
|
||
func (w *Wheel) asTick(t time.Time) uint64 { | ||
return uint64(t.UnixNano()) / uint64(w.period) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the w.period
was a nice power of 2, like 2*22 nanoseconds, you could probably do:
t.UnixNano() >> 22
rather than a divide here. since we already have a nextPowerOfTwo
function..
return uint64(t.UnixNano()) / uint64(w.period) | ||
} | ||
|
||
func nextPowerOfTwo(n int64) (num int, exponent uint64) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this only happens once on startup, it doesn't need to be super fast, so I'd prefer simplicity here:
func nextPow2(n int) (num int, exponent uint64) {
pow := uint64(0)
for (1 << pow) < n {
pow++
}
return 1 << pow, pow
}
batch = next | ||
} | ||
if unexpired != nil { | ||
// We should only hit this case if we're a full wheel rotation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not a situation we ever want to be in, so we might need to think about how to "slow" down the caller till the wheel is ready. (E.g., we hold the bucket lock until we're done processing, and anyone who tries to schedule a new timer is blocked on the lock) or how we get metrics out of there so we know this is happening.
if we're a wheel rotation behind, then we either have a bug or have way too much work to do in the process
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can certainly get metrics out; by design, though, locking the bucket won't help much, since most timers are going into some other bucket with a different lock. We can introduce a global synchronization point, but I don't think that's worth the cost.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(No action needed, we should first export metrics before we try to solve this problem)
My thought was more:
gatherBucket
grabs the lock, but does not unlock itfire
would receive a list of timers + mutex. after firing timers, it would unlock the bucket.
Now if we're super behind, we stall the relay till we catch up. This is a little crazy for a few reasons though:
- We can't guarantee how long we'll wait for that lock, we could be waiting well past the timeout without returning any error frame or response
- More likely to have bugs where we never unlock since the lock/unlock are spread apart and there's more potential for bugs
But the general idea is "no more timers in a bucket till the previous bucket is done processing them". We shouldn't really need this anytime soon though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I think I understand what you're proposing. I'm pointing out that locking a single bucket doesn't actually stall the relay, since there are still 24k+ other buckets that they're pouring timers into. In most cases I can imagine, if the ticker thread is falling behind, it will continue to fall behind even if we put some backpressure on a tiny fraction of timer schedulers.
We could introduce a global coordination point for rate limiting, but I don't think that's worthwhile.
Agreed on deferring until we collect metrics.
w.ticker.Stop() | ||
close(w.stop) | ||
<-w.stopped | ||
todo := w.gatherTimers(0, uint64(len(w.buckets))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think passing len(w.buckets)
works since you check whether each timer's deadline is > tick
, and if so, leave it in the unexpired list. Since the timer's deadline is unmasked, it will almost always be > tick
var unexpired *Timer | ||
for batch != nil { | ||
next := batch.next | ||
if batch.f != nil && batch.deadline > tick { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we check batch.f != nil
here and below? is there timers with no f
set? Can we catch them before we insert them rather than in this loop?
} | ||
|
||
func (t *Timer) push(head *Timer) *Timer { | ||
if t == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if t == nil
, then doing head.next = t
does the same as what's in this if
. One of the advantages of pushing to the front, you can always just do head.next = front
, no need for a special case here.
clock: clock, | ||
period: period, | ||
ticker: clock.Ticker(period), | ||
bucketsMask: power, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, this is a pretty big bug, not sure why unit tests didn't catch this, but the mask should always be power - 1
.
nextPowerOfTwo
returns a power of 2, let's say it returns 32768. That's 0b1000000000000000
. If you &
with that value, which we do in AfterFunc
, it can only ever be 0
or 32768
. If you subtract 1 from that value, you get 0b111111111111111
which is much better.
Can we make sure unit tests would catch this. I think we need to make sure that timers are firing roughly around when we expect, not just that they fired eventually or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ugh, bits are hard; both power
and power-1
are wrong. As you point out in the meat of your comment, we actualy want numBuckets-1
.
708d436
to
ed5d091
Compare
Updated, now with fewer bit manipulation errors.
|
6e876de
to
5eb5a0d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haven't fully reviewed the tests, but from a quick scan they looked good.
The benchmark numbers are looking good
deadline uint64 // in ticks | ||
state atomic.Int32 | ||
next *Timer | ||
tail *Timer // only set on head |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No action needed,
This is a little odd. The tail
should be owned by a bucket, which is what keeps a reference to the head. We can clean this up later, let's leave a TODO.
// Stop cancels the deferred operation. It returns whether or not the | ||
// cancellation succeeded. | ||
func (t *Timer) Stop() bool { | ||
if t == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we ever expect this case?
if head == nil { | ||
return t | ||
} | ||
if head.tail == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's a lot of cases in this function, and i think a lot of them are to make pushOne
work, so perhaps they should just be separate methods? something like:
func (t *Timer) pushOne(node *Timer) *Timer {
node.next = t
if t == nil {
node.tail = node
} else {
node.tail = t.tail
t.tail = nil
}
return node
}
func (t *Timer) push(head *Timer) *Timer {
if head == nil {
return t
}
if t == nil {
return head
}
head.next = t
head.tail = t.tail
t.tail = next
return head
}
(I haven't tested the logic, but I think it's possible to write this with a lot less branching and complexity)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may be misunderstanding, but I think that's only sort of the case; making push
simpler just moves complexity to gatherBuckets
. When we're adding any unexpired timers back into the bucket, we'd need to walk a bit of the list to see if it's a single element or >1 element, using push
or pushOne
as appropriate. To avoid that, we could hold the lock on the bucket for the entire method, rather than scoping it as narrowly as it is now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, not sure I understand what you mean, let me describe what I'm thinking, I might be misunderstanding something. If it's still confusing, let's sync up offline.
gatherBuckets
first replaces the buckets under a lock to nil
. If any timers are scheduled in the bucket, the bucket will be a separate linked list (with a correct tail
).
We then iterate over the scheduled items in the bucket, and build up 2 linked lists using pushOne
, one for expired and one for unexpired. In my proposed implementation of pushOne
, the node is always pushed to the head, and the tail
is maintained correctly. So we end up with 2 linked lists, both of which have a correct tail
. We don't actually need to maintain tail
for the expired list, but that's a bit of a micro-optimization we can worry about later :)
At this point, the unexpired list is either nil
or it's a *Timer
with tail
set correctly. Now gatherBuckets
will get the lock again (if the unexpired list has nodes) and will do
b.timers = b.timers.push(unexpired)
This basically prepends the items in the bucket with the unexpired
list, which works fine since both lists have tail
maintained correctly.
This is assuming pushOne
and push
are implemented as I suggested above, where we make the following assumptions:
push
is called on the head of one linked list, and passed in the head of another linked list as an argumentpushOne
is called on the head of one linked list, and is passed in any node. If the passed in node is part of another linked list, that other linked list will be "broken"- The head of the linked list is either
nil
or has a non-niltail
. Thetail
may point back to the head.
Given that, the functions maintain the tail
correctly so we should not need to walk anything.
} | ||
|
||
func (bs *bucketList) Schedule(deadline uint64, f func()) *Timer { | ||
if f == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to support this use case? The standard library does not, I don't think we should either,
https://play.golang.org/p/OdxBnyCzXQ
Seems like a code bug if someone is scheduling timers to do nothing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
} | ||
b := &bs.buckets[deadline&bs.mask] | ||
b.Lock() | ||
b.timers = b.timers.push(t) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should probably use pushOne
here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Equivalent here, but pushOne
is certainly more explicit.
) | ||
} | ||
|
||
func fakeWork() {} // sentinel non-nil func to schedule |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i don't think this is a sentinel value, just a noop func.
sentintel implies that something else is looking for this specific value, but this is just a normal function as far as the timer wheel is concerned
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will rename.
defer w.Stop() | ||
|
||
var wg sync.WaitGroup | ||
cb := func() { wg.Done() } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cb := wg.Done
, no need for closure, the compiler will do that for you
for i := range ds { | ||
ds[i] = ds[i] + 24*time.Hour | ||
} | ||
for i := range ds { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason for a separate for loop rather than moving this logic into the above one?
} | ||
} | ||
|
||
func BenchmarkStandardLibraryTimerExpiry(b *testing.B) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the standard library doesn't use the mock clock, we should just use the real clock in our real timer wheel, and compare the numbers. It seems odd to advance a mock clock that's not used, but then use real time in this benchmark while the timer wheel benchmark ignores real time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I can do that. The numbers don't end up being comparable, though, since the wheel ends up spending a bunch of time sleeping until the next tick.
This approach makes the benchmarks odd, but the results more directly comparable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah that makes it a little trickier. Let's land this and we can improve it separately.
// We fire all pending timers when we shut down the relay, which | ||
// includes the timeout for the blocked handler. That unblocks the | ||
// handler before we get to the call above, which obviates the point of | ||
// this test. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe? I think that's what this test was exercising. Separately, we weren't waiting on wg
anywhere, but I think this is the right spot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should consider whether we really want to fire all timers when stopping the wheel, since that times out all in-flight calls.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm, that would make this test flaky since the AssertEcho
would fail. If you run this test many times, does it fail?
I am quite sure that the current approach has a race between shutting down timers and new timers being scheduled though:
- Relay gets a call, picks a connection (everything is active at this point)
- Channel is shutdown, connections enter the close state machine, wheel is stopped
- Wheel fires all existing timers
- The previous call gets to the
addRelayItem
call and add a new item to the wheel.
We now have a pending relay item with an unfired timer that will never fire. Since the complete channel shutdown waits on all exchanges to be removed and the wheel is stopped, we're now depending on the remote side to complete the exchange. If they don't, we'll be stuck forever. #390 makes this worse, since the connection could have failed, but we don't clear the relay items and instead wait for timeout which will never happen.
If we really want graceful shutdown, then it seems like the wheel should be active until the channel is completely stopped (e.g., channel state is set to ChannelClosed
).
We really shouldn't have active timers at that point, but just in case, I think we should fire them. Ideally, any timers scheduled after the wheel has shutdown also fire immediately. In future when we change AfterFunc
to take an interface, we can have a separate method Shutdown
so instead of Fire
, it has a separate path.
(Since we actually shutdown the ephemeral relays in the Hyperbahn emulation, the shutdown path is more critical than it was previously)
} | ||
} | ||
|
||
func BenchmarkStandardLibraryTimerExpiry(b *testing.B) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah that makes it a little trickier. Let's land this and we can improve it separately.
if head == nil { | ||
return t | ||
} | ||
if head.tail == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, not sure I understand what you mean, let me describe what I'm thinking, I might be misunderstanding something. If it's still confusing, let's sync up offline.
gatherBuckets
first replaces the buckets under a lock to nil
. If any timers are scheduled in the bucket, the bucket will be a separate linked list (with a correct tail
).
We then iterate over the scheduled items in the bucket, and build up 2 linked lists using pushOne
, one for expired and one for unexpired. In my proposed implementation of pushOne
, the node is always pushed to the head, and the tail
is maintained correctly. So we end up with 2 linked lists, both of which have a correct tail
. We don't actually need to maintain tail
for the expired list, but that's a bit of a micro-optimization we can worry about later :)
At this point, the unexpired list is either nil
or it's a *Timer
with tail
set correctly. Now gatherBuckets
will get the lock again (if the unexpired list has nodes) and will do
b.timers = b.timers.push(unexpired)
This basically prepends the items in the bucket with the unexpired
list, which works fine since both lists have tail
maintained correctly.
This is assuming pushOne
and push
are implemented as I suggested above, where we make the following assumptions:
push
is called on the head of one linked list, and passed in the head of another linked list as an argumentpushOne
is called on the head of one linked list, and is passed in any node. If the passed in node is part of another linked list, that other linked list will be "broken"- The head of the linked list is either
nil
or has a non-niltail
. Thetail
may point back to the head.
Given that, the functions maintain the tail
correctly so we should not need to walk anything.
@@ -565,6 +565,12 @@ func TestRelayRejectsDuringClose(t *testing.T) { | |||
require.Error(t, err, "Expect call to fail after relay is shutdown") | |||
assert.Contains(t, err.Error(), "incoming connection is not active") | |||
close(block) | |||
wg.Wait() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely an oversight on my part.
Why have a WaitGroup
, if you never Wait
on it.. That might be a good linter rule, or maybe just a philosophical question
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// We fire all pending timers when we shut down the relay, which | ||
// includes the timeout for the blocked handler. That unblocks the | ||
// handler before we get to the call above, which obviates the point of | ||
// this test. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm, that would make this test flaky since the AssertEcho
would fail. If you run this test many times, does it fail?
I am quite sure that the current approach has a race between shutting down timers and new timers being scheduled though:
- Relay gets a call, picks a connection (everything is active at this point)
- Channel is shutdown, connections enter the close state machine, wheel is stopped
- Wheel fires all existing timers
- The previous call gets to the
addRelayItem
call and add a new item to the wheel.
We now have a pending relay item with an unfired timer that will never fire. Since the complete channel shutdown waits on all exchanges to be removed and the wheel is stopped, we're now depending on the remote side to complete the exchange. If they don't, we'll be stuck forever. #390 makes this worse, since the connection could have failed, but we don't clear the relay items and instead wait for timeout which will never happen.
If we really want graceful shutdown, then it seems like the wheel should be active until the channel is completely stopped (e.g., channel state is set to ChannelClosed
).
We really shouldn't have active timers at that point, but just in case, I think we should fire them. Ideally, any timers scheduled after the wheel has shutdown also fire immediately. In future when we change AfterFunc
to take an interface, we can have a separate method Shutdown
so instead of Fire
, it has a separate path.
(Since we actually shutdown the ephemeral relays in the Hyperbahn emulation, the shutdown path is more critical than it was previously)
batch = next | ||
} | ||
if unexpired != nil { | ||
// We should only hit this case if we're a full wheel rotation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(No action needed, we should first export metrics before we try to solve this problem)
My thought was more:
gatherBucket
grabs the lock, but does not unlock itfire
would receive a list of timers + mutex. after firing timers, it would unlock the bucket.
Now if we're super behind, we stall the relay till we catch up. This is a little crazy for a few reasons though:
- We can't guarantee how long we'll wait for that lock, we could be waiting well past the timeout without returning any error frame or response
- More likely to have bugs where we never unlock since the lock/unlock are spread apart and there's more potential for bugs
But the general idea is "no more timers in a bucket till the previous bucket is done processing them". We shouldn't really need this anytime soon though.
assert.Equal(t, 1, root.len(), "Unexpected length after pushing nil to a len-1 root.") | ||
assert.Equal(t, originalRoot, root) | ||
assert.Nil(t, root.next) | ||
assert.Nil(t, root.tail) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
push
should only be used with a valid head which has a tail
pointer. pushOne
on the other hand could work with a node that is not a head.
wg.Wait() | ||
} | ||
|
||
for i := range scheduled { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since you have a slice of *Timer
, you should probably do for _, t := range scheduled { t.Stop() }
assert.True(t, canceled.Stop()) | ||
} | ||
|
||
for elapsed := time.Duration(0); elapsed < maxDelay; elapsed += tickDuration { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since you never use elapsed
and maxDelay
in the body of the loop, why not just make this simpler:
for i := 0; i < maxTimers; i++ {
wg.Add(1)
c.Add(tickDuration)
wg.Wait()
}
for _, d := range timeouts { | ||
w.AfterFunc(d, func() { wg.Done() }) | ||
} | ||
c.Add(maxDelay * 2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
c.Add
has a loop that unblocks timers individually, and runs runtime.Gosched()
. Is the Gosched
not enough time to do the channel read + run a single timeout (which is non-blocking)?
It might work, just not sure, so would like to at least temporarily add logs to validate what it's doing.
c.Add(maxDelay * 2) | ||
wg.Wait() | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need a test to validate the behaviour when we're behind on doing work. What happens when the AfterFunc
blocks, and we go past the watermark
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already covered in TestTimerDroppingTicks
.
eb30dec
to
76b3a10
Compare
Ready for another round of review. Planning follow-on PRs for:
|
Tests are failing on Go tip, and I'm not quite sure why: https://travis-ci.org/uber/tchannel-go/jobs/184112308#L939 These tests and the code they exercise looks correct to me, and passes with |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Production code looks good, I think there's some helper test functions that should help with more validation. Otherwise, looks good
@@ -18,36 +18,41 @@ | |||
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | |||
// THE SOFTWARE. | |||
|
|||
package tchannel | |||
package tchannel_test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these changes only for the Sleep
after closing the channels?
If we need to keep this, can we dot import "tchannel-go" like in other tests, so that the test code itself doesn't change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, just for the sleep with testutils. Happy to dot-import.
@@ -57,6 +62,7 @@ func TestAllChannelsRegistered(t *testing.T) { | |||
ch1_1.Close() | |||
ch2_1.Close() | |||
ch2_2.Close() | |||
time.Sleep(testutils.Timeout(10 * time.Millisecond)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the sleeps?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we moved Channel.onClosed
to run async, we can't guarantee that Close
immediately removes the channel from the introspection results.
Since this is the only place we're relying upon the current behavior, I opted to make these tests uglier instead of making the production code more complex. Alternatively, I can keep the call to removeClosedChannel
in the Close
method instead of putting it in onClosed
, but that seems weird - introspection doesn't match reality.
tombs uint64 | ||
items map[uint32]relayItem | ||
} | ||
|
||
func newRelayItems(logger Logger) *relayItems { | ||
func newRelayItems(wheel *timers.Wheel, logger Logger) *relayItems { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super nit: ordering of args is wheel, logger
but ordering of fields in struct and initializer is logger, wheel
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, okay.
} | ||
|
||
func newWheel(period, maxTimeout time.Duration, clock clock.Clock) *Wheel { | ||
tickNanos, power := nextPowerOfTwo(int64(period)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For maxTimeout
, we round up to be be conservative and give use a higher max than what was specified so we can just mask. For period, I think the conservative option would be to round down right? If the user asks for 5ms, using a tick period of ~4ms would be better than what the user asked for.
I think that can be done easily by doing:
tickNanos, power := nextPowerOfTwo(int64(period)/2+1)
If the user specifies 5ms, we would find the next power of 2 after 2.5ms, which is 4ms. If the user specifies an exact power of 2 like 4.194..ms, then we divide by 2 but then add 1 so we don't select ~2ms.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense.
func newWheel(period, maxTimeout time.Duration, clock clock.Clock) *Wheel { | ||
tickNanos, power := nextPowerOfTwo(int64(period)) | ||
numBuckets, _ := nextPowerOfTwo(int64(maxTimeout) / tickNanos) | ||
if time.Duration(numBuckets*tickNanos) < maxTimeout { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we use the conservative approach for ticks above, this case should never be hit, so we can remove it.
assert.Equal(t, 1, root.len(), "Unexpected length after pushing one timer to nil root.") | ||
assert.Equal(t, head, root, "Expected pushOne with nil receiver to return head.") | ||
assert.Nil(t, root.next, "Expected one-node list to have a nil next link.") | ||
assert.Equal(t, root, root.tail, "Expected head of single-node list to be its own tail.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of just a len
method, should have a method which does more validation that is useful for more tests,
func assertLinkedList(t *testing.T, timer *Timer, expectedLen int) {
if expectedLen == 0 {
assert.Nil(t, timer, "Empty linked list should be nil")
return
}
nodes := 0
last := head
for cur := head; cur != nil; cur = last.next {
if cur != head {
assert.Nil(t, cur.tail, "Only the head should have a tail set")
}
nodes++
last = cur
}
// Verify tail is actually the last node.
assert.Equal(t, head.tail, last, "Tail pointer is incorrect")
assert.Equal(t, expectedLen, nodes, "Number of nodes is incorrect")
}
Then, the validation here would really only be:
var root *Timer
assertLinkedList(t, root, 0)
head := newTimer(0, nil)
root := root.pushOne(head)
assertLinkedList(t, root, 1)
assert.Equal(t, head, root, [...])
assert.Panics(...)
There's no need to verify the next
and tail
since the assertLinkedList
will do that. Same is true for other tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Added in an explicit test for cycles too.
ee22c6f
to
d984dd0
Compare
Fun story: I think we're putting too much load on our Travis containers. Tests are failing with no error message beyond "signal: killed", which sounds like https://docs.travis-ci.com/user/common-build-problems/#My-build-script-is-killed-without-any-error. |
1a505a0
to
d85e1d8
Compare
Git surgery complete - unrelated test fixes are in #551. Once that's merged and this PR is rebased, tests should pass. |
bbce95e
to
4b94a2a
Compare
Add a timer wheel package that's more efficient (though more coarse-grained) than the standard library's. Also add the necessary dependencies.
2a136c7
to
b3f4510
Compare
a7019f6
to
615295f
Compare
This degree of perf doesn't seem to be necessary, so there's virtually no chance we'll spend the time required to get to the bottom of these Travis failures. Closing. |
This diff replaces the relayer's use of the standard lib's
time.Timer
with a custom timer wheel. The wheel reduces the overhead of maintaining timers in four ways:The tradeoffs are, of course, that we coarsen timers to fire only every ~5ms, and we run a greater risk of blocking the tick thread when callbacks block.
With 10k relayed calls in flight, benchmarks suggest that this implementation is 50% faster than the standard lib:
/cc @prashantv @billf @monicato @witriew